In this article we give an example of how to use the MpmcQueue
so you can transfer messages between any number of producers and consumers through a lock-less concurrent queue.
The Sample Code
Without further ado we list a sample code below:
package com.coralblocks.coralqueue.sample.mpmc; import java.util.concurrent.atomic.AtomicLong; import com.coralblocks.coralqueue.mpmc.Consumer; import com.coralblocks.coralqueue.mpmc.MpmcQueue; import com.coralblocks.coralqueue.mpmc.Producer; public class Sample { private static final int NUMBER_OF_PRODUCERS = 4; private static final int NUMBER_OF_CONSUMERS = 2; public static void main(String[] args) throws InterruptedException { final MpmcQueue<StringBuilder> mpmc = new MpmcQueue<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_PRODUCERS, NUMBER_OF_CONSUMERS); Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS]; for(int i = 0; i < consumers.length; i++) { consumers[i] = new Thread("Consumer-" + i) { private final Consumer<StringBuilder> consumer = mpmc.nextConsumer(); @Override public void run() { boolean running = true; while(running) { long avail; while((avail = consumer.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = consumer.poll(); if (sb == null) break; // mandatory for mpmc! if (sb.length() == 0) { running = false; break; // exit immediately... } System.out.println(sb.toString() + " got by consumer " + consumer.getIndex()); } consumer.donePolling(); } } }; consumers[i].start(); } Thread[] producers = new Thread[NUMBER_OF_PRODUCERS]; final AtomicLong counter = new AtomicLong(0); for(int i = 0; i < producers.length; i++) { producers[i] = new Thread("Producer-" + i) { private final Producer<StringBuilder> producer = mpmc.nextProducer(); @Override public void run() { StringBuilder sb; for(int i = 0; i < 3; i++) { while((sb = producer.nextToDispatch()) == null); // busy spin long msgNumber = counter.getAndIncrement(); sb.setLength(0); sb.append("message ").append(msgNumber); System.out.println("sending message " + msgNumber + " from producer " + producer.getIndex()); producer.flush(); } } }; producers[i].start(); } for(int i = 0; i < producers.length; i++) producers[i].join(); Thread.sleep(4000); Producer<StringBuilder> p = mpmc.getProducer(0); for(int i = 0; i < consumers.length; i++) { StringBuilder sb; // send a message to stop consumers... // routing is being used here... while((sb = p.nextToDispatch(i)) == null); // busy spin sb.setLength(0); } p.flush(); for(int i = 0; i < consumers.length; i++) consumers[i].join(); } }
Routing Messages
You can also choose to route a message to a specific consumer. To do that, all you have to do is call nextToDispatch(int consumerIndex)
and you can be sure that your message will be processed by the specific consumer. That can be useful to partition certain types of messages to specific consumers and avoid processing them in parallel and out of order. You can also pass a negative number as the consumer index and the mpmc queue will fall back to sending the message to a random consumer.
Conclusion
With CoralQueue you can easily build an architecture where multiple producers send messages to multiple consumers in a lock-free, super fast way.