Implicit & Explicit Locks
BlockingQueues
A BlockingQueue is a queue that blocks when a thread attempts to poll an item when the queue is empty. A thread attempting to offer an item when the queue is full is also blocked untill some other thread removes an item to make some room. Naturally, this is ideal for the producer-consumer problem. There are several implementations of BlockingQueue interface. These classes are particularly useful when you want to separate the synchronization aspect from the solution. By separating the waiting and signalling logic from the producer-consumer implementation, the code is much more easier to read. To demonstrate this, we will use a SynchronousQueue. A SynchronousQueue is a BlockingQueue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. Since all blocking and wait/notify/await/signal logic is handled by the blocking queue, the producer consumer solution looks like so:
/** * @(#)LockTest.java Feb 26, 2008 */ package lock; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; /** * Demonstrates transfering messages between two threads. * * @author $Author: vijaykandy $ * @version $Revision: 1.4 $ */ public class BolckingQueueDemo { /** * Main. * * @param s */ public static void main(String s[]) { BlockingQueue<String> drop = new SynchronousQueue<String>(); Consumer c1 = new Consumer("C1", drop); Producer p1 = new Producer("P1", drop); (new Thread(p1)).start(); (new Thread(c1)).start(); } /** * Produces a message and places in the queue. * * @author $Author: vijaykandy $ * @version $Revision: 1.4 $ */ static class Producer implements Runnable { private String name; private BlockingQueue<String> drop; private List<String> messages = new ArrayList<String>() { { add("Let's use guarded"); add("blocks to create a"); add("Producer-Consumer"); add("application. This"); add("kind of application"); add("shares data between two"); add("threads: the producer,"); add("that creates the data,"); add("and the consumer,"); add("that does something "); add("with it."); add("DONE"); } }; public Producer(String name, BlockingQueue<String> drop) { this.name = name; this.drop = drop; } public void run() { Random random = new Random(); for (String msg : messages) { try { drop.put(msg); System.out.format("%s produced: %s%n", name, msg); Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } } } /** * Consumes a message from queue. * * @author $Author: vijaykandy $ * @version $Revision: 1.4 $ */ static class Consumer implements Runnable { private String name; private BlockingQueue<String> drop; public Consumer(String name, BlockingQueue<String> drop) { this.name = name; this.drop = drop; } public void run() { Random random = new Random(); while (true) { try { String message = drop.take(); System.out.format("%s received: %s%n", name, message); if (message.equals("DONE")) { return; } Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } } } }









Leave your response!