生产者—消费者问题中两个进程同时共享一个公共的固定大小的缓冲区。其中一个是生产者,将信息换入缓冲区;另一个是消费者,从缓冲区中取出信息。

      当缓冲区已满,而此时生产者还想向其中放入一个新的数据项时。其解决办法是让生产者睡眠,待消费者从缓冲区取出一个或者多个数据项时在唤醒它。同样的,当消费者试图从缓冲区中取出数据而发现缓冲区为空时,消费者就睡眠,直到生产者向其中放入一些数据时再将其唤醒。

      互斥量是一个处于两态之一的变量:解锁和加锁。



#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<semaphore.h>
#include<stdlib.h>
#include<time.h>

#define BUFFER_LEN 10 /* 缓冲区的长度 */

int Buffer[BUFFER_LEN]; /* 缓冲区 */
int in, out; /* 产品进、出缓冲区的指针 */
sem_t FullSemaphore; /* 缓冲区满的信号量(已在缓冲区中的产品数) */
sem_t EmptySemaphore; /* 缓冲区空的信号量(空闲缓冲区数) */
pthread_mutex_t Pmutex, Cmutex; /* 两个互斥锁,用于写互斥 */

#define MAX_THREAD_NUM 100 /* 最大线程数目 */

 

typedef struct _ThreadInfo { /* 线程信息结构 */
 pthread_t thread;/*线程组*/ 
     int ID; /* 线程标示 */
     int sleeptime; /* 休息时间 */
     int num; /* 生产产品编号 */
}ThreadInfo;

ThreadInfo CustomerThread_Info[MAX_THREAD_NUM]; /* 生产者线程信息数组 */
ThreadInfo ProducerThread_Info[MAX_THREAD_NUM]; /* 消费者线程信息数组 */
int total; /* 当前产品总数目 */

int threads_init(int ThreadNum);
void *ProducerT(void *ptr);
void *ConsumerT(void *ptr);
void produce(int ThreadID);
void consume(int ThreadID);

int main(int argc, char** argv)
{
 int i=0;
 int pThreadNum;
 printf("Please input the number of thread: ");
 scanf("%d", &pThreadNum);

 /*if(pThreadNum > MAX_THREAD_NUM)
  printf("The number must lower than 100!\n\n");
 else
 {*/
  threads_init(pThreadNum);
  for(i=0; i<pThreadNum; i++)
  {
   CustomerThread_Info[i].ID = i;   
   CustomerThread_Info[i].sleeptime = random() % 9 + 1;
   CustomerThread_Info[i].num= i*2;
   pthread_create(&CustomerThread_Info[i].thread, NULL, ConsumerT, CustomerThread_Info+i);

   ProducerThread_Info[i].ID = i;
   ProducerThread_Info[i].sleeptime = random() % 9 + 1;
   ProducerThread_Info[i].num = i*2 + 1;
   pthread_create(&ProducerThread_Info[i].thread, NULL, ProducerT, ProducerThread_Info+i);

   /*pthread_join( CustomerThread_Info[i].thread, NULL );*/
   /*pthread_join( ProducerThread_Info[i].thread, NULL );*/
    
  }
  for(i=0; i<pThreadNum; i++)
  {
   pthread_join( CustomerThread_Info[i].thread, NULL );
   pthread_join( ProducerThread_Info[i].thread, NULL );     
  }

  pthread_mutex_destroy( &Pmutex );
  pthread_mutex_destroy( &Cmutex );
  sem_destroy( &FullSemaphore );
  sem_destroy( &EmptySemaphore );
  
  printf("It's the end!!\n\n");
 
  return 0;
}

int threads_init(int ThreadNum)
{
 //int i=0;
 
 in = 0;
 out = -1;
 pthread_mutex_init( &Pmutex, NULL );
 pthread_mutex_init( &Cmutex, NULL );
 sem_init( &EmptySemaphore, 0, 0 );
 sem_init( &FullSemaphore, 0, 10);
 
 /*for(i=0; i< ThreadNum; i++)
 {
  CustomerThread_Info[i].ID = 0;
  CustomerThread_Info[i].sleeptime = 0;
  CustomerThread_Info[i].num = 0;

  ProducerThread_Info[i].ID = 0;
  ProducerThread_Info[i].sleeptime = 0;
  ProducerThread_Info[i].num = 0;
 }*/
 
}
void produce(int ThreadID)
{
 int position;
 int total;
 Buffer[in] = in;
 position = total++;
 
 printf("Producer is working!\n"); 
 printf("Producer %d producing a product.\n", ThreadID);
 printf("The ID is %d.\n", in);
 printf("Sending it to buffer!\n");
 printf("The left number of product is %d.\n\n", total);
 in = (in + 1)%10;

}
void consume(int ThreadID)
{
 int position;
 int total;
 position = total--;
 
 printf("Consumer is working!\n");
 printf("Consumer %d take a product.\n", ThreadID);
 printf("The ID is %d.\n", out);
 printf("Receiving it from buffer!\n");
 printf("The left number of product is %d.\n\n", total);
 out = (out + 1)%10;

}

void *ConsumerT(void *ptr)
{
 ThreadInfo *pThread = (ThreadInfo*)ptr;
 sleep(pThread->sleeptime);
 sem_wait(&EmptySemaphore);
 pthread_mutex_lock(&Cmutex);
 consume(pThread->ID);
 pthread_mutex_unlock(&Cmutex );
 sem_post(&FullSemaphore);
 
 return 0;

}
void *ProducerT(void *ptr)
{
 ThreadInfo *pThread = (ThreadInfo*)ptr;
 sleep(pThread->sleeptime);
 sem_wait(&FullSemaphore);
 pthread_mutex_lock(&Pmutex);
 produce(pThread->ID);
 pthread_mutex_unlock(&Pmutex);
 sem_post(&EmptySemaphore);

 return 0;

}