生产者—消费者问题中两个进程同时共享一个公共的固定大小的缓冲区。其中一个是生产者,将信息换入缓冲区;另一个是消费者,从缓冲区中取出信息。
当缓冲区已满,而此时生产者还想向其中放入一个新的数据项时。其解决办法是让生产者睡眠,待消费者从缓冲区取出一个或者多个数据项时在唤醒它。同样的,当消费者试图从缓冲区中取出数据而发现缓冲区为空时,消费者就睡眠,直到生产者向其中放入一些数据时再将其唤醒。
互斥量是一个处于两态之一的变量:解锁和加锁。
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<semaphore.h>
#include<stdlib.h>
#include<time.h>
#define BUFFER_LEN 10 /* 缓冲区的长度 */
int Buffer[BUFFER_LEN]; /* 缓冲区 */
int in, out; /* 产品进、出缓冲区的指针 */
sem_t FullSemaphore; /* 缓冲区满的信号量(已在缓冲区中的产品数) */
sem_t EmptySemaphore; /* 缓冲区空的信号量(空闲缓冲区数) */
pthread_mutex_t Pmutex, Cmutex; /* 两个互斥锁,用于写互斥 */
#define MAX_THREAD_NUM 100 /* 最大线程数目 */
typedef struct _ThreadInfo { /* 线程信息结构 */
pthread_t thread;/*线程组*/
int ID; /* 线程标示 */
int sleeptime; /* 休息时间 */
int num; /* 生产产品编号 */
}ThreadInfo;
ThreadInfo CustomerThread_Info[MAX_THREAD_NUM]; /* 生产者线程信息数组 */
ThreadInfo ProducerThread_Info[MAX_THREAD_NUM]; /* 消费者线程信息数组 */
int total; /* 当前产品总数目 */
int threads_init(int ThreadNum);
void *ProducerT(void *ptr);
void *ConsumerT(void *ptr);
void produce(int ThreadID);
void consume(int ThreadID);
int main(int argc, char** argv)
{
int i=0;
int pThreadNum;
printf("Please input the number of thread: ");
scanf("%d", &pThreadNum);
/*if(pThreadNum > MAX_THREAD_NUM)
printf("The number must lower than 100!\n\n");
else
{*/
threads_init(pThreadNum);
for(i=0; i<pThreadNum; i++)
{
CustomerThread_Info[i].ID = i;
CustomerThread_Info[i].sleeptime = random() % 9 + 1;
CustomerThread_Info[i].num= i*2;
pthread_create(&CustomerThread_Info[i].thread, NULL, ConsumerT, CustomerThread_Info+i);
ProducerThread_Info[i].ID = i;
ProducerThread_Info[i].sleeptime = random() % 9 + 1;
ProducerThread_Info[i].num = i*2 + 1;
pthread_create(&ProducerThread_Info[i].thread, NULL, ProducerT, ProducerThread_Info+i);
/*pthread_join( CustomerThread_Info[i].thread, NULL );*/
/*pthread_join( ProducerThread_Info[i].thread, NULL );*/
}
for(i=0; i<pThreadNum; i++)
{
pthread_join( CustomerThread_Info[i].thread, NULL );
pthread_join( ProducerThread_Info[i].thread, NULL );
}
pthread_mutex_destroy( &Pmutex );
pthread_mutex_destroy( &Cmutex );
sem_destroy( &FullSemaphore );
sem_destroy( &EmptySemaphore );
printf("It's the end!!\n\n");
return 0;
}
int threads_init(int ThreadNum)
{
//int i=0;
in = 0;
out = -1;
pthread_mutex_init( &Pmutex, NULL );
pthread_mutex_init( &Cmutex, NULL );
sem_init( &EmptySemaphore, 0, 0 );
sem_init( &FullSemaphore, 0, 10);
/*for(i=0; i< ThreadNum; i++)
{
CustomerThread_Info[i].ID = 0;
CustomerThread_Info[i].sleeptime = 0;
CustomerThread_Info[i].num = 0;
ProducerThread_Info[i].ID = 0;
ProducerThread_Info[i].sleeptime = 0;
ProducerThread_Info[i].num = 0;
}*/
}
void produce(int ThreadID)
{
int position;
int total;
Buffer[in] = in;
position = total++;
printf("Producer is working!\n");
printf("Producer %d producing a product.\n", ThreadID);
printf("The ID is %d.\n", in);
printf("Sending it to buffer!\n");
printf("The left number of product is %d.\n\n", total);
in = (in + 1)%10;
}
void consume(int ThreadID)
{
int position;
int total;
position = total--;
printf("Consumer is working!\n");
printf("Consumer %d take a product.\n", ThreadID);
printf("The ID is %d.\n", out);
printf("Receiving it from buffer!\n");
printf("The left number of product is %d.\n\n", total);
out = (out + 1)%10;
}
void *ConsumerT(void *ptr)
{
ThreadInfo *pThread = (ThreadInfo*)ptr;
sleep(pThread->sleeptime);
sem_wait(&EmptySemaphore);
pthread_mutex_lock(&Cmutex);
consume(pThread->ID);
pthread_mutex_unlock(&Cmutex );
sem_post(&FullSemaphore);
return 0;
}
void *ProducerT(void *ptr)
{
ThreadInfo *pThread = (ThreadInfo*)ptr;
sleep(pThread->sleeptime);
sem_wait(&FullSemaphore);
pthread_mutex_lock(&Pmutex);
produce(pThread->ID);
pthread_mutex_unlock(&Pmutex);
sem_post(&EmptySemaphore);
return 0;
}