Implementing blocking queue where there are many consumers access the queue to get data from it and there is only one producer put data in the queue.

Blocking Queue interface.


package BlockingQueue;

public interface BlockingQueue<T> {
public void put(T a);
public T take();
}

Blocking Queue Classes

package BlockingQueue;

import java.util.LinkedList;
import java.util.Queue;

public class Main {
public static void main(String args[]) {

System.out.println("Press Control-C to stop.");

MyQueue q = new MyQueue();
new Producer(q);
new Consumer(1, q);
new Consumer(2, q);
new Consumer(3, q);
}

}

// Class Blocking Queue
class MyQueue implements BlockingQueue<Integer> {
int n;
int size = 5;
Queue<Integer> arr = new LinkedList<Integer>();

// use synchronized word to sync threads.
synchronized public Integer take() {
if (arr.isEmpty())
try {
System.out.println("Waiting Put ...");
wait();
} catch (InterruptedException e) {
System.out.println("InterruptedException caught");
}

try {
n= arr.poll();
System.out.println("Take: " + n);
notify();
} catch (Exception e) {
System.out.println("Error in Take: " + e.getMessage());
}
return n;
}
// use synchronized word to sync threads.
synchronized public void put(Integer n) {

// When Queue is full wait until item removed.
if (arr.size() == 5 )
try {
System.out.println("Waiting Take...");
wait();
} catch (InterruptedException e) {
System.out.println("InterruptedException caught");
}

arr.add(n);
System.out.println("Put: " + n);
notify();
}
}
// Class Producer
class Producer implements Runnable {
MyQueue q;

Producer(MyQueue q) {
this.q = q;
new Thread(this, "Producer").start();
}

public void run() {
for(int i= 0; i < 100 ; i++ )
q.put(i);
}

}

// Class Consumer
class Consumer implements Runnable {
MyQueue q;
int consumerNumber;

Consumer(int consumerNumber, MyQueue q) {
this.q = q;
this.consumerNumber = consumerNumber;
new Thread(this, "Consumer").start();
}

public void run() {
for(int i= 0 ; i < 100 ; i++ ) {
System.out.println("Consumer #" + consumerNumber + " ");
q.take();
}
}
}

References:

1- http://www.roseindia.net/javatutorials/blocking_queue.shtml

2- http://docs.oracle.com/javase/6/docs/api/java/lang/Thread.html

Advertisements