CoralQueue is a ultra-low-latency lock-free and garbage-free queue for inter-thread communication. It can be defined as a batching queue backed up by a circular array (i.e. the ring buffer) filled with pre-allocated transfer objects which uses memory-barriers to synchronize producers and consumers through sequences. Fortunately you don’t have to understand all its intrinsic details to use it. In this article we show how to use CoralQueue to send messages from a producer thread to a consumer thread in a very fast way without producing any garbage.
The Queue
The queue is a circular array with pre-allocated transfer objects. For the example below we use a StringBuilder
as the transfer object.
Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, StringBuilder.class);
The code above creates a queue with 1024 pre-allocated StringBuilder
s. Note that it uses the default constructor of StringBuilder which by default creates a StringBuilder with size 16. That may be too small for our transfer objects and we don’t want the StringBuilder resizing itself during runtime. So to create a bigger StringBuilder we can use a com.coralblocks.coralbits.util.Builder
class like below:
Builder<StringBuilder> builder = new Builder<StringBuilder>() { @Override public StringBuilder newInstance() { return new StringBuilder(1024); } }; final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, builder);
Sending Messages
To send a message to the queue, you grab a transfer object from the queue, fill it with your data and call flush()
as the code below illustrates:
StringBuilder sb; while((sb = queue.nextToDispatch()) == null); // busy spin... sb.setLength(0); sb.append("Hello there!"); queue.flush();
If the queue is full we just busy spin until a transfer object becomes available. Later we will see how we can also use a WaitStrategy
instead of busy spinning.
You can also send messages in batches:
StringBuilder sb; while((sb = queue.nextToDispatch()) == null); // busy spin... sb.setLength(0); sb.append("Hello there!"); while((sb = queue.nextToDispatch()) == null); // busy spin... sb.setLength(0); sb.append("Hello again!"); queue.flush();
Polling Messages
To read message from the queue you poll them from a consumer thread, as the code below shows:
long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = queue.poll(); // do whatever you want with the StringBuilder // just do not create garbage // copy char-by-char instead } queue.donePolling();
Again we busy spin if the queue is empty. Later we will see how we can also use a WaitStrategy
instead of busy spinning.
Note that we poll in batches, reducing the number of times we have to check for an empty queue through availableToPoll()
.
Wait Strategies
By default, you should busy-spin when the queue is full or empty respectively. That’s usually the fastest approach but not always the best as you might want to save energy or allow other threads to use the idle processor. CoralQueue comes with many wait strategies that you can use instead of busy-spinning, or you can create your owns by implementing the WaitStrategy
interface. Below are some examples of wait strategies that come with CoralQueue:
ParkWaitStrategy
: park (i.e. sleep) for 1 nanosecond with the option to back off up to a maximum of N nanoseconds. N defaults to 1 million nanoseconds if not specified (1 millisecond).SpinParkWaitStrategy
: first busy spins for C cycles (default to 1 million cycles) then it starts to park (i.e. sleep) for 1 nanosecond with the option to back off up to a maximum of N nanoseconds (default 1 million nanoseconds).SpinYieldParkWaitStrategy
: busy spins for some cycles, yield for some cycles then starts to sleep for 1 nanosecond with the option to back off up to a maximum of N nanoseconds (defaults to 1 million nanoseconds).
To use a wait strategy, all you have to do is call its block
and reset
methods instead of busy spinning:
WaitStrategy waitStrategy = new ParkWaitStrategy(); StringBuilder sb; while((sb = queue.nextToDispatch()) == null) { waitStrategy.block(); } sb.setLength(0); sb.append("Hello there!"); queue.flush(); waitStrategy.reset(); // you can reset here to save some nanoseconds...
Same thing when polling:
WaitStrategy waitStrategy = new SpinParkWaitStrategy(); long avail; while((avail = queue.availableToPoll()) == 0) { waitStrategy.block(); } for(int i = 0; i < avail; i++) { StringBuilder sb = queue.poll(); // do whatever you want with the StringBuilder // just do not create garbage // copy char-by-char instead } queue.donePolling(); waitStrategy.reset(); // you can reset here to save some nanoseconds...
Semi-volatile writes (lazySet)
To squeeze every bit of performance out of CoralQueue, you can use semi-volatile writes when sending and polling messages. Basically, a semi-volatile write is done through the lazySet
method from java.util.concurrent.AtomicLong
. It is a faster operation for the thread that’s modifying the variable at the expense of the thread that’s interested in knowing about updates in the variable. For example, if you want to minimize the latency in the producer, you should use lazySet. If you want to minimize the message transit time, you should not use lazySet so the consumer is notified as soon as possible about a new message in the queue.
By default, CoralQueue does not use lazySet, but you can easily take control of that by using the methods below:
queue.flush(); // no lazySet by default queue.flush(true); // use lazySet queue.donePolling(); // no lazySet by default queue.donePolling(true); // use lazySet
Complete Example
To put all parts together, we write a simple program that send 10 timestamps to a consumer thread and then exits:
package com.coralblocks.coralqueue.sample.queue; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; public class Sample { public static void main(String[] args) throws InterruptedException { final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class); Thread consumer = new Thread() { @Override public void run() { boolean running = true; while(running) { long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); if (ml.get() == -1) { // message to flag exit... running = false; break; } System.out.println(ml.get()); } queue.donePolling(); } } }; consumer.start(); MutableLong ml; for(int i = 0; i < 10; i++) { while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(System.nanoTime()); queue.flush(); } // send a message to stop consumer... while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(-1); queue.flush(); consumer.join(); // wait for the consumer thread to die... } }
Conclusion
CoralQueue makes the development of ultra-low-latency, lock-free and garbage-free multithreading applications easy by pipelining messages among threads. It also offers batching, semi-volatile writes and wait strategies though a simple API. CoralQueue also provides a multiplexer (multiple-producers to one-consumer), a demultiplexer (one-producer to multiple-consumers) and a mpmc queue (multiple-producers to multiple-consumers).