Monday, July 16, 2012

JAVA.UTIL.CONCURRENT BLOCKINGQUEUE INTERFACE

This interface extends Queue interface (which in turn extends Collection interface). All implementations of Blocking Queue are Thread-Safe. Blocking terminology comes from the fact that BlockingQueue provides methods which can make the calling Thread to Wait before proceeding further.

Consumer Thread

If the Queue is empty, the Consumer Thread does not have to explicitly check for the emptiness (and write code to wait() till it becomes non-empty), BlockingQueue provided methods which encapsulate all this. Makes life simpler. So if the Queue is empty, consumer Thread will WAIT, till some element is inserted in the Queue.
1. take(): Just as in a Queue following FIFO, it removes and returns the head. If Queue is empty, it will make the Thread to WAIT till an element becomes available.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Taking example of Continuous Consumer
Queue simpleQueue = new ArrayDeque(10);
...
...
while (true) {
    synchronized (simpleQueue) {
        while (simpleQueue.isEmpty()) {
            simpleQueue.wait();
        }
        simpleQueue.remove();
    }
}
//This can be simplified to below
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
...
...
while(true) {
    blockingQueue.take(); //Waits till Queue becomes non empty. While loop not continuous
}
2. poll(): Just as in a Queue following FIFO, it removes and returns the head. If Queue is empty, it returns a null. No Blocking here.
1
2
3
while(true) {
    blockingQueue.poll(); //Wont make the Thread WAIT, Continuous while loop
}
3. poll(TIMEOUT): Just as in a Queue following FIFO, it removes and returns the head. If Queue is empty, it will make the Thread to WAIT till the specified  TIMEOUT or till an element becomes available (whichever comes first). Returns null in case it waits till TIMEOUT.
4. remove(object): Just as a Collection, it removes element from Queue when element.equals(object). If more than one element are equal, the one closer to HEAD is removed. Returns TRUE if found, FALSE otherwise. In case of multiple elements which element is removed can be checked with the following code.
1
2
3
4
5
6
7
8
9
10
11
12
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("1");
blockingQueue.put("3");
System.out.println("Before: " +blockingQueue);
blockingQueue.remove("1");
System.out.println("After: "+ blockingQueue);
Output:
Before: [1, 2, 1, 3]
After: [2, 1, 3]

Producer Thread

Similarly, there are methods to insert in the Queue, whereby we don’t have to write code to check if Queue is full not (and wait() till it has some space free). All this is provided to us already. So if the Queue is full, producer Thread will WAIT, till some element is deleted from the Queue.
1. put(object): Just as in a Queue, inserts the object to the tail. Will make the Thread to WAIT till BlockingQueue is full (specified size for the bounded Queues or Integer.MAX_VALUE for unbounded Queues). Once space is available, inserts the object.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Taking example of Continuous Producer
Queue simpleQueue = new ArrayDeque(10);
...
...
while (true) {
    synchronized (simpleQueue) {
        while (simpleQueue.size() == 10) {
            simpleQueue.wait();
        }
        simpleQueue.add("1");
    }
}
//This can be simplified to below
BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
...
...
while(true) {
    blockingQueue.put("1"); //Waits till space becomes available. While loop not continuous
}
2. add(object):  Just as in a Queue, it inserts the object at the tail. If Queue is full, it throws an Exception, else returns TRUE. No Blocking here.
3. offer(object): Just as in a Queue, it inserts the object at the tail. If Queue is full, it returns a FALSE, else TRUE. No Blocking here.
4. offer(TIMEOUT): Just as in a Queue, it inserts the object at the tail. If Queue is full, it will make the Thread to WAIT till the specified  TIMEOUT or till the space becomes available (whichever comes first). Returns FALSE in case it waits till TIMEOUT, TRUE other wise.

Points to Note

1. null elements are not allowed in the BlockingQueue. Remember poll() returns null if Queue is Empty. Hence this restriction.
2. addAll(..) type bulk operations methods inherited from Collection Interface are not atomic. So it might happen that post adding couple of elements, Queue becomes full and fails to add anymore, throwing an IllegalStateException.