In this article we analyze how CoralQueue implements a multiplexer to allow multiple producers to send messages to a single consumer. We then present the throughput numbers for different set of configurations with different set of cpu cores.
The Multiplexer
Below an example of using the AtomicMux
:
package com.coralblocks.coralqueue.sample.mux; import com.coralblocks.coralbits.util.Builder; import com.coralblocks.coralqueue.mux.AtomicMux; import com.coralblocks.coralqueue.mux.Mux; import com.coralblocks.coralqueue.mux.Producer; public class SampleWithProducer { private static final int NUMBER_OF_PRODUCERS = 4; public static void main(String[] args) throws InterruptedException { Builder<StringBuilder> builder = new Builder<StringBuilder>() { @Override public StringBuilder newInstance() { return new StringBuilder(1024); } }; final Mux<StringBuilder> mux = new AtomicMux<StringBuilder>(1024, builder, NUMBER_OF_PRODUCERS); Thread[] producers = new Thread[NUMBER_OF_PRODUCERS]; for(int i = 0; i < producers.length; i++) { producers[i] = new Thread(new Runnable() { private final Producer<StringBuilder> producer = mux.nextProducer(); @Override public void run() { StringBuilder sb; for(int j = 0; j < 4; j++) { while((sb = producer.nextToDispatch()) == null); // busy spin sb.setLength(0); sb.append("message ").append(j).append(" from producer ").append(producer.getIndex()); producer.flush(); } // send final message while((sb = producer.nextToDispatch()) == null); // busy spin sb.setLength(0); // empty string to signal we are done producer.flush(); } }, "Producer" + i); } Thread consumer = new Thread(new Runnable() { @Override public void run() { boolean running = true; int finalMessages = 0; while(running) { long avail; while((avail = mux.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = mux.poll(); if (sb.length() == 0) { if (++finalMessages == NUMBER_OF_PRODUCERS) { // and we are done! running = false; break; } } else { System.out.println(sb.toString()); } } mux.donePolling(); } } }, "Consumer"); consumer.start(); for(int i = 0; i < producers.length; i++) producers[i].start(); consumer.join(); for(int i = 0; i < producers.length; i++) producers[i].join(); } }
Throughput Numbers
The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.
Two producers pinned to their own cores sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 613.93 millis | Min Time: 583.798 millis | Max Time: 639.488 millis | Nano Timing Cost: 15.0 nanos Average time to send 20,000,000 messages per pass in 20 passes: 613,929,765 nanos Messages per second: 32,577,016
Two producers pinned to the same core with hyper-threading sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 560.601 millis | Min Time: 535.936 millis | Max Time: 576.715 millis | Nano Timing Cost: 14.0 nanos Average time to send 20,000,000 messages per pass in 20 passes: 560,601,268 nanos Messages per second: 35,675,980
Four producers pinned to two cores with hyper-threading sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 1.061 secs | Min Time: 1.03 secs | Max Time: 1.091 secs | Nano Timing Cost: 14.0 nanos Average time to send 40,000,000 messages per pass in 20 passes: 1,060,708,245 nanos Messages per second: 37,710,652
Conclusions
With CoralQueue you can easily send message from multiple producers to a single consumer. Its throughput numbers are between 30 to 40 million messages per second.