Concurrent Programming with Java
Lab Manual, Version 1.0, F. Astha Ekadiyanto,
2002.
The Buffer Class is the common shared resource/object between
Consumer and Producer objects. It plays the role similar to Briefkasten
Class in the Race Condition section.
But here, rather that only a single sized field, we will use a elements vector
and limit its size using the capacity field.
Another important issue in this class is the queueing discipline of accessing
the Buffer. Observe the pseudocode of put or get method below:
public synchronized void putOrGet( Object element ) { Thread caller = Thread.currentThread(); // getting reference to the thread calling this method if ( queueing/waiting_condition ) { queue.addElement( caller ); // Add the thread to queue while ( queueing/waiting_condition || caller != queue.firstElement() ) try { wait(); } // Ask the thread to wait in queue catch ( InterruptedException e ) {} queue.removeElement( caller ); // The thread will be served and removed from queue } //perform Put or Get Process notifyAll(); }
The pseudocode above will make sure that the queueing Threads that are trying to access the Buffer will be served in a First In First Out (FIFO) manner. If the Thread which is holding the Monitor is not the first in the queue, it should wait and pass the Monitor to others.
The Buffer Class codes are as follows:
import java.util.Vector; /** * The class Buffer implements a buffer that can be used by threads to exchange * objects. The threads that get blocked because the buffer is full or empty are * kept in a queue, so the buffer has fifo behaviour. * @author Stephan Fischli * @version 1.1 */ public class Buffer { /** * The capacity of the buffer. */ private int capacity; /** * The elements of the buffer. */ private Vector elements; /** * The queue of threads waiting until the buffer is not full or not empty. */ private Vector queue; /** * Constructs a Buffer object. * @param capacity the capacity of the buffer */ public Buffer( int capacity ) { this.capacity = capacity; elements = new Vector( capacity ); queue = new Vector(); } /** * Returns the size of the buffer. */ public int size() { return elements.size(); } /** * Adds an element to the end of the buffer. If the buffer is full, the * calling thread will be blocked until another thread removes an element * from the buffer. * @param element the element to be added */ public synchronized void put( Object element ) { Thread caller = Thread.currentThread(); if ( elements.size() == capacity ) { queue.addElement( caller ); while ( size() == capacity || caller != queue.firstElement() ) try { wait(); } catch ( InterruptedException e ) {} queue.removeElement( caller ); } elements.addElement( element ); notifyAll(); } /** * Removes the first element from the buffer. If the buffer is empty, the * calling thread will be blocked until another thread adds an element to * the buffer. * @return the removed element */ public synchronized Object get() { Thread caller = Thread.currentThread(); if ( elements.isEmpty() ) { queue.addElement( caller ); while ( elements.isEmpty() || caller != queue.firstElement() ) try { wait(); } catch ( InterruptedException e ) {} queue.removeElement( caller ); } Object element = elements.firstElement(); elements.removeElement( element ); notifyAll(); return element; } } |