Below the basics of CoralQueue:
Queue
One producer sending messages to one consumer. One thread sending to another thread.
package com.coralblocks.coralqueue.sample.queue; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; public class Sample { public static void main(String[] args) throws InterruptedException { final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class); Thread consumer = new Thread() { @Override public void run() { boolean running = true; while(running) { long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); if (ml.get() == -1) { // message to flag exit... running = false; break; } System.out.println(ml.get()); } queue.donePolling(); } } }; consumer.start(); MutableLong ml; for(int i = 0; i < 10; i++) { while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(System.nanoTime()); queue.flush(); } // send a message to stop consumer... while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(-1); queue.flush(); consumer.join(); // wait for the consumer thread to die... } }
Demultiplexer
One producer distributing messages to multiple consumers. Each message is processed only once by a random consumer. One thread sending to many threads. Great for parallel processing. Note that you can also route a message to a specific consumer (line 60).
package com.coralblocks.coralqueue.sample.demux; import com.coralblocks.coralqueue.demux.AtomicDemux; import com.coralblocks.coralqueue.demux.Consumer; import com.coralblocks.coralqueue.demux.Demux; public class SampleWithConsumer { private static final int NUMBER_OF_CONSUMERS = 4; public static void main(String[] args) throws InterruptedException { final Demux<StringBuilder> demux = new AtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS); Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS]; for(int i = 0; i < consumers.length; i++) { consumers[i] = new Thread("Consumer-" + i) { private final Consumer<StringBuilder> consumer = demux.nextConsumer(); @Override public void run() { boolean running = true; while(running) { long avail; while((avail = consumer.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = consumer.poll(); if (sb == null) break; // mandatory for demuxes! if (sb.length() == 0) { running = false; break; // exit immediately... } System.out.println(sb.toString()); } consumer.donePolling(); } } }; consumers[i].start(); } StringBuilder sb; for(int i = 0; i < 3; i++) { while((sb = demux.nextToDispatch()) == null); // busy spin sb.setLength(0); sb.append("message ").append(i); demux.flush(); } // send a message to stop consumers... for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) { // routing is being used here... while((sb = demux.nextToDispatch(i)) == null); // busy spin sb.setLength(0); } demux.flush(); // sent batch for(int i = 0; i < consumers.length; i++) consumers[i].join(); } }
Multiplexer
Multiple producers pipelining (i.e. sending) messages to a single consumer. Many threads sending to one thread. Great for gathering results from parallel processing.
package com.coralblocks.coralqueue.sample.mux; import com.coralblocks.coralbits.util.Builder; import com.coralblocks.coralqueue.mux.AtomicMux; import com.coralblocks.coralqueue.mux.Mux; import com.coralblocks.coralqueue.mux.Producer; public class SampleWithProducer { private static final int NUMBER_OF_PRODUCERS = 4; public static void main(String[] args) throws InterruptedException { Builder<StringBuilder> builder = new Builder<StringBuilder>() { @Override public StringBuilder newInstance() { return new StringBuilder(1024); } }; final Mux<StringBuilder> mux = new AtomicMux<StringBuilder>(1024, builder, NUMBER_OF_PRODUCERS); Thread[] producers = new Thread[NUMBER_OF_PRODUCERS]; for(int i = 0; i < producers.length; i++) { producers[i] = new Thread(new Runnable() { private final Producer<StringBuilder> producer = mux.nextProducer(); @Override public void run() { StringBuilder sb; for(int j = 0; j < 4; j++) { while((sb = producer.nextToDispatch()) == null); // busy spin sb.setLength(0); sb.append("message ").append(j).append(" from producer ").append(producer.getIndex()); producer.flush(); } // send final message while((sb = producer.nextToDispatch()) == null); // busy spin sb.setLength(0); // empty string to signal we are done producer.flush(); } }, "Producer" + i); } Thread consumer = new Thread(new Runnable() { @Override public void run() { boolean running = true; int finalMessages = 0; while(running) { long avail; while((avail = mux.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = mux.poll(); if (sb.length() == 0) { if (++finalMessages == NUMBER_OF_PRODUCERS) { // and we are done! running = false; break; } } else { System.out.println(sb.toString()); } } mux.donePolling(); } } }, "Consumer"); consumer.start(); for(int i = 0; i < producers.length; i++) producers[i].start(); consumer.join(); for(int i = 0; i < producers.length; i++) producers[i].join(); } }
Splitter / Broadcaster
One producer sending messages to multiple consumers. Each message is processed by each and every consumer. One thread sending to many threads. Great for broadcasting messages to multiple threads.
package com.coralblocks.coralqueue.sample.splitter; import com.coralblocks.coralqueue.splitter.AtomicSplitter; import com.coralblocks.coralqueue.splitter.Consumer; import com.coralblocks.coralqueue.splitter.Splitter; public class SampleWithConsumer { private static final int NUMBER_OF_CONSUMERS = 4; public static void main(String[] args) throws InterruptedException { final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS); Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS]; for(int i = 0; i < consumers.length; i++) { consumers[i] = new Thread("Consumer-" + i) { private final Consumer<StringBuilder> consumer = splitter.nextConsumer(); @Override public void run() { boolean running = true; while(running) { long avail; while((avail = consumer.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = consumer.poll(); if (sb == null) break; // mandatory for splitters! if (sb.length() == 0) { running = false; break; // exit immediately... } System.out.println("got " + sb.toString() + " at consumer " + consumer.getIndex()); } consumer.donePolling(); } } }; consumers[i].start(); } StringBuilder sb; for(int i = 0; i < 3; i++) { while((sb = splitter.nextToDispatch()) == null); // busy spin sb.setLength(0); sb.append("message ").append(i); splitter.flush(); } // send a message to stop consumers... while((sb = splitter.nextToDispatch()) == null); // busy spin sb.setLength(0); splitter.flush(); for(int i = 0; i < consumers.length; i++) consumers[i].join(); } }
MpmcQueue
Multiple-producers sending messages to multiple-consumers. You can choose any number of producers and consumers. You can also route a message to a specific consumer (line 90).
package com.coralblocks.coralqueue.sample.mpmc; import java.util.concurrent.atomic.AtomicLong; import com.coralblocks.coralqueue.mpmc.Consumer; import com.coralblocks.coralqueue.mpmc.MpmcQueue; import com.coralblocks.coralqueue.mpmc.Producer; public class Sample { private static final int NUMBER_OF_PRODUCERS = 4; private static final int NUMBER_OF_CONSUMERS = 2; public static void main(String[] args) throws InterruptedException { final MpmcQueue<StringBuilder> mpmc = new MpmcQueue<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_PRODUCERS, NUMBER_OF_CONSUMERS); Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS]; for(int i = 0; i < consumers.length; i++) { consumers[i] = new Thread("Consumer-" + i) { private final Consumer<StringBuilder> consumer = mpmc.nextConsumer(); @Override public void run() { boolean running = true; while(running) { long avail; while((avail = consumer.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = consumer.poll(); if (sb == null) break; // mandatory for mpmc! if (sb.length() == 0) { running = false; break; // exit immediately... } System.out.println(sb.toString() + " got by consumer " + consumer.getIndex()); } consumer.donePolling(); } } }; consumers[i].start(); } Thread[] producers = new Thread[NUMBER_OF_PRODUCERS]; final AtomicLong counter = new AtomicLong(0); for(int i = 0; i < producers.length; i++) { producers[i] = new Thread("Producer-" + i) { private final Producer<StringBuilder> producer = mpmc.nextProducer(); @Override public void run() { StringBuilder sb; for(int i = 0; i < 3; i++) { while((sb = producer.nextToDispatch()) == null); // busy spin long msgNumber = counter.getAndIncrement(); sb.setLength(0); sb.append("message ").append(msgNumber); System.out.println("sending message " + msgNumber + " from producer " + producer.getIndex()); producer.flush(); } } }; producers[i].start(); } for(int i = 0; i < producers.length; i++) producers[i].join(); Thread.sleep(4000); Producer<StringBuilder> p = mpmc.getProducer(0); for(int i = 0; i < consumers.length; i++) { StringBuilder sb; // send a message to stop consumers... // routing is being used here... while((sb = p.nextToDispatch(i)) == null); // busy spin sb.setLength(0); } p.flush(); for(int i = 0; i < consumers.length; i++) consumers[i].join(); } }