In Part 3 of threads we looked at synchronization, monitor model of synchronization. Now, today we will look at Consumer-Producer Thread concept.

Putting It Together:

  • The code in this is an example of thread interaction that demonstrates the use of wait and notify methods to solve a classic producer-consumer problem.
  • Start by looking at the outline of the stack object and the details of the threads that access it.
  • Then look at the details of the stack and the mechanisms used to protect the stack’s data and to implement the thread communication based on the stack’s state.
  • The example stack class, called SyncStack to distinguish it from the core clas java.util.Stack, offers the following public API:

public synchronized void push(char c);

public synchronized char pop();

The Producer Thread:

The producer thread generates new characters to be placed on the stack. the code below shows the Producer class.

  1. package mod13;
  2. public class Producer implements Runnable
  3. {
  4. private SyncStack theStack;
  5. private int num;
  6. private static int counter = 1;
  7. public Producer(SyncStack s)
  8. {
  9. theStack = s;
  10. num = counter++;
  11. }
  12. public void run()
  13. {
  14. char c;
  15. for(int i=0;i<200;i++)
  16. {
  17. c = (char) (Math.random()*26+’A’);
  18. theStack.push(c);
  19. System.out.println(“Producer” + num + “: ” + c);
  20. try{
  21. Thread.sleep((int) (Math.random() * 300));
  22. }catch(InterruptedException e)
  23. {
  24. // ignore it
  25. }
  26. }
  27. }// end run method
  28. }// end Producer class
  • This example generates 200 random peerages characters and pushes them onto the stack with a random delay of 0-300 milliseconds between each push.
  • Each pushed character is reported on the console, along with an identifier for which producer thread is executing.

The Consumer Thread:

The Consumer thread removes characters from the stack. the given code below shows the Consumer class.

  1. package mod13;
  2. public class Consumer implements Runnable
  3. {
  4. private SyncStack theStack;
  5. private int num;
  6. private static int counter = 1;
  7. public Consumer(SyncStack s)
  8. {
  9. theStack = s;
  10. num = counter++;
  11. }
  12. public void run()
  13. {
  14. char c;
  15. for(int i = 0; i<200 ; i++)
  16. {
  17. c = theStack.pop();
  18. System.out.println(“Consumer” + num + “: ” + c);
  19. try{
  20. Thread.sleep((int) (Math.random() * 300));
  21. }catch(InterruptedException e)
  22. {
  23. // ignore it
  24. }
  25. }
  26. }// end run method
  27. }// end Consumer class
  • This example collects 200 characters fro the stack, with a random delay of 0-300 milliseconds between each attempt.
  • Each uppercase character is reported on the console, along with an identifier to identify the consumer thread that is executing.
  • Now consider construction of the stack class. You are going to create a stack that has a seemingly limitless size, using the ArrayList class.
  • With this design, your threads have only to communicate based on whether the stack is empty.

The SyncStack Class:

A newely constructed SyncStack object’s buffer should be empty. You can use the following code to build your class:

  1. public class SyncStack
  2. {
  3. public List<Character> buffer = new ArrayList<Character>(400);
  4. public synchronized char pop()
  5. {
  6. // pop code here
  7. }
  8. public synchronized void push(char c)
  9. {
  10. // push code here
  11. }
  12. }

There are no constructors. It is considered good style to include a constructor, but it has been omitted for brevity.

 

Java Threading : Producer Consumer

 

The pop Method:

  • Now consider the push and pop methods. They must be synchronized to protect the shared buffer.
  • In addition, if the stack is empty in the pop method, the executing thread must wait. When the stack in the push method is no longer empty, waiting threads are notified.
  • The code given below shows the pop method:
  1. public synchronized char pop()
  2. {
  3. char c;
  4. while(buffer.size() == 0)
  5. {
  6. try{
  7. this.wait();
  8. }catch(InterruptedException e)
  9. {
  10. // ignore it…
  11. }
  12. }
  13. c = buffer.remove(buffer.size()-1);
  14. return c;
  15. }
  • The wait call is made with respect to the stack object that shows how the rendezvous is being made with a particular object.
  • Nothing can be popped from the stach when it is empty, so a thread trying to pop data from the stack must wait until the stack is no longer empty.
  • The wait call is placed in a try-catch block because an inturrept call can terminate the thread’s waiting period. The wait must also be within a loop for this example.
  • If for some reason (such as an inturrept) the thread wakes up and discovers that the stack is still empty, then the thread must re-enter the waiting condition.
  • The pop method for the stack is synchronized for two reasons. First, popping a character off of the stack affects the shared data buffer.
  • Second, the call to this.wait() must be within a block that is synchronized on the stack object, which is represented by this.
  • The push method uses this.nootify() to release a thread from the stack object’s wait pool. After a thread is released, it can obtain the lock on the stack and continue executing the pop method, which removes a character from the stack’s buffer.

Note: In pop, the wait method is called before any character is removed from the stack. this is because the removal cannot proceed until some character is available.

  • You should also consider error checking. You may notice that there is no explicit code to prevent a stack underflow.
  • This is not necessary because the onkly way to remove characters from the stack is through the pop method, and this method causes the executing thread to enter the wait state if no character is available. Therefore, error checking is unnecessary.

The push Method:

  • The push method is similar to pop method. It affects the shared buffer amd must also be synchronized.
  • In addition, because the push method adds a character to the buffer, it is responsible for notifying threads that are waiting for a non-empty stack.
  • This notification is done with respect to the stack object.
  • The code below shows the push method:

 

  1. public synchronized void push(char c)
  2. {
  3. this.notify();
  4. buffer.add(c);
  5. }
  • The call to this.notify() serves to release a single thread that called wait because the stack is empty.
  • Calling notify before the shared data is changed is of no consequence. The stack object’s lock is released only upon exit from the synchronized block, so thread waiting for that lock can obtain it while the stack data are being changed by the pop method.

Putting all of pieces together , the code given below shows the complete SyncStack class:

  1. package mod13;
  2. import java.util.*;
  3. public class SyncStack
  4. {
  5. private List<Character> buffer = new ArrayList<Character>(400);
  6. public synchronized char pop()
  7. {
  8. char c;
  9. while(buffer.sized() == 0)
  10. {
  11. try{
  12. this.wait();
  13. }catch(InterruptedException e)
  14. {
  15. // ignore it
  16. }
  17. }
  18. c = buffer.remove(buffer.size() – 1);
  19. return c;
  20. }
  21. public synchronized void push(char c)
  22. {
  23. this.notify();
  24. buffer.add(c);
  25. }
  26. }

The SyncTest Example:

  • You must assemble the producer, consumer, and stack code into completer classes. A test harness is required to bring these pieces together.
  • Pay particular attention to how SyncTest creates only one stack object that is shared by all threads.
  • The code given below shows the SyncTest class:
  1. package mod13;
  2. public class SyncTest
  3. {
  4. public static void main(String args[])
  5. {
  6. SyncStack stack = new SyncStack();
  7. Producer p1 = new Producer(stack);
  8. Thread prodT1 = new Thread(p1);
  9. prodT1.start();
  10. Producer p2 = new Producer(stack);
  11. Thread prodT2 = new Thread(p2);
  12. prodT2.start();
  13. Consumer c1 = new Consumer(stack);
  14. Thread consT1 = new Thread(c1);
  15. consT1.start();
  16. Consumer c2 = new Consumer(stack);
  17. Thread consT2 = new Thread(c2);
  18. consT2.start();
  19. }
  20. }

The following is an example of the output from java mod13.SyncTest. every this thread code is run, the results vary.

Producer2: F

Consumer1: F

Producer2: K

Consumer2: K

Producer2: T

Producer1: N

Producer1: V

Consumer2:  V

Consumer1: N

Producer2:  V

Producer2: U

Consumer2: U

Consumer2: V

Producer1: F

Consumer1: F

Producer2: M

Consumer2: M

Consumer2: T

Written by Sourabh Bhunje

Sourabh Bhunje, B.E. IT from Pune University. Currently Working at Techliebe.Professional Skills: Programming - Software & Mobile, Web & Graphic Design, Localization, Content Writing, Sub-Titling etc.http://techliebe.com/about-us

Leave a Comment