Friday, May 30, 2014

Producer Consumer Problem's solution in Java

From the Wikipedia definition,  the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate a piece of data, put it into the buffer and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer) one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.
The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again. In the same way, the consumer can go to sleep if it finds the buffer to be empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer. The solution can be reached by means of inter-process communication. An inadequate solution could result in a deadlock where both processes are waiting to be awakened. The problem can also be generalized to have multiple producers and consumers.

In this post I am sharing my producer-consumer solution written in Java.

Producer class:

import java.util.Vector;

public class MyProducer implements Runnable{
 /*taking vector to use as buffer, 
         *it will keep numbers produced by Producer 
  *and work as synchronisation object
        */
 Vector<Integer> sharedQ;
 // Size will store the maximum size of buffer
 int SIZE;
 /*
  * Constructor for Producer class
  */
 public MyProducer(Vector<Integer> sharedQ, final int size) {
  this.sharedQ =sharedQ;
  this.SIZE= size;
 }
 @Override
 public void run() {
  int i=0;
  while(true){
   i++;
   produce(i);
  }
 }
 /*
  * This method has the logic for producing numbers 
         *and synchronization with consumer
  */
 public void produce(int i) {
  //testing if buffer is already full
  while(sharedQ.size() == SIZE) {
   synchronized (sharedQ) {
    try {
     System.out.println("producer waiting...");
     //waiting until some space is made in buffer
     sharedQ.wait();
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
  }
  //adding numbers in buffer
  synchronized(sharedQ) {
   sharedQ.add(i);
   System.out.println("Producer produced: "+i);
   //notifying other thread which are waiting on sharedQ
   sharedQ.notifyAll();
  }
  try {
   /*
    * making thread to sleep for random time after producing  
    *so as to make output more readable and understandable
    *we can set any arbitrary sleeping time
    */
   Thread.sleep(((long)((Math.random())*10000)));
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
}


Consumer class:

import java.util.Vector;

public class MyConsumer implements Runnable {
 
        /*taking vector to use as buffer, 
         *it will keep numbers produced by Producer 
  *and work as synchronisation object
        */ 
        Vector<Integer> sharedQ;
 
 public MyConsumer(Vector<Integer> sharedQ) {
  this.sharedQ = sharedQ;
 }
 
 @Override
 public void run() {
  /*
   * loop will go on forever
   */
  while(true) {
      consumed();
     /*
      * making thread to sleep for random time after consuming  
      *so as to make output more readable and understandable
      *we can set any arbitrary sleeping time
      */
      try {
   Thread.sleep(((long)((Math.random())*20000)));
   
      } catch (InterruptedException e) {   
   e.printStackTrace();
      }
       }
 }
 /*
  * This method has the logic for consuming numbers 
         *and synchronization with producer
  */
 public void consumed() {
  // cheacking if buffer is empty-then consumer has to wait
   while(sharedQ.isEmpty()) {
    synchronized(sharedQ) {
     try {
      System.out.println(Thread.currentThread().getName()+" waiting...");
      sharedQ.wait();
     } catch (InterruptedException e) {
      
      e.printStackTrace();
     }
     
    }
   }
   //consuimng a number from the buffer
   synchronized(sharedQ) {
      System.out.println(Thread.currentThread().getName()+" consumed "+sharedQ.remove(0));
      //notifying other thread which are waiting on sharedQ
      sharedQ.notifyAll();
   }
  }
 
}

  Test class:

import java.util.Vector;

public class MyProducerConsumerTest { 
 
 public static void main(String args[]) {
  
        /*taking vector to use as buffer, 
         *it will keep numbers produced by Producer 
  *and work as synchronisation object
         */  
        Vector<Integer> sharedQ = new Vector<Integer>();
        // Size will store the maximum size of buffer
  final int SIZE = 5;
 //Making producer and consumer thread
 Thread producer = new Thread(new MyProducer(sharedQ,SIZE),"producer");
 Thread consumer = new Thread(new MyConsumer(sharedQ),"consumer");
  
 //starting producer/consumer threads
 producer.start();
 consumer.start();
 /*
  * this join is optional- 
  * after join this main thread will never finish as consumer/producer willl run forever
  * to see effect of join we need to make consumer/producer to run for some specific time
  */
  
 try {
  producer.join();
  consumer.join();
   
 } catch (InterruptedException e) {   
  e.printStackTrace();
 }
 /*
  * this will never get printed 
  * unless we remove join call made above or finish execution of producer/consumer threads
  */
 System.out.println("all done");
 }
}

We can make this code to work for multiple producer/consumer scenario just by changing test class and creating multiple threads for producer and consumer.

Modified Test class:

import java.util.Vector;

public class MyProducerConsumerTest { 
 
 public static void main(String args[]) {
  
        /*taking vector to use as buffer, 
         *it will keep numbers produced by Producer 
  *and work as synchronisation object
         */  
        Vector<Integer> sharedQ = new Vector<Integer>();
        // Size will store the maximum size of buffer
  final int SIZE = 5; 
 
     //Making producer and consumer thread
 Thread producer1 = new Thread(new MyProducer(sharedQ,SIZE),"producer1");
        Thread producer2 = new Thread(new MyProducer(sharedQ,SIZE),"producer2"); 
        Thread producer3 = new Thread(new MyProducer(sharedQ,SIZE),"producer3");
        Thread consumer1 = new Thread(new MyConsumer(sharedQ),"consumer1");
        Thread consumer2 = new Thread(new MyConsumer(sharedQ),"consumer2");
        Thread consumer3 = new Thread(new MyConsumer(sharedQ),"consumer3");
        Thread consumer4 = new Thread(new MyConsumer(sharedQ),"consumer4"); 
  
       //starting producer/consumer threads
 producer1.start();
        producer2.start();
        producer3.start(); 
        consumer1.start();
        consumer2.start();
        consumer3.start();
       consumer4.start(); 
        /*
  * this join is optional- 
  * after join this main thread will never finish as consumer/producer willl run forever
  * to see effect of join we need to make consumer/producer to run for some specific time
  */
  
 try {
  producer1.join();
                producer2.join();
               producer3.join(); 
                consumer1.join();
                consumer2.join();
                consumer3.join();
                consumer4.join(); 
        } catch (InterruptedException e) {   
  e.printStackTrace();
 }
 /*
  * this will never get printed 
  * unless we remove join call made above or finish execution of producer/consumer threads
  */
 System.out.println("all done");
 }
}

 

 

No comments :

Post a Comment