In this article we analyze how CoralQueue implements a multiplexer to allow multiple producers to send messages to a single consumer. We then present the throughput numbers for different set of configurations with different set of cpu cores.
The Multiplexer
Below an example of using the AtomicMux
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | package com.coralblocks.coralqueue.sample.mux; import com.coralblocks.coralbits.util.Builder; import com.coralblocks.coralqueue.mux.AtomicMux; import com.coralblocks.coralqueue.mux.Mux; import com.coralblocks.coralqueue.mux.Producer; public class SampleWithProducer { private static final int NUMBER_OF_PRODUCERS = 4 ; public static void main(String[] args) throws InterruptedException { Builder<StringBuilder> builder = new Builder<StringBuilder>() { @Override public StringBuilder newInstance() { return new StringBuilder( 1024 ); } }; final Mux<StringBuilder> mux = new AtomicMux<StringBuilder>( 1024 , builder, NUMBER_OF_PRODUCERS); Thread[] producers = new Thread[NUMBER_OF_PRODUCERS]; for ( int i = 0 ; i < producers.length; i++) { producers[i] = new Thread( new Runnable() { private final Producer<StringBuilder> producer = mux.nextProducer(); @Override public void run() { StringBuilder sb; for ( int j = 0 ; j < 4 ; j++) { while ((sb = producer.nextToDispatch()) == null ); // busy spin sb.setLength( 0 ); sb.append( "message " ).append(j).append( " from producer " ).append(producer.getIndex()); producer.flush(); } // send final message while ((sb = producer.nextToDispatch()) == null ); // busy spin sb.setLength( 0 ); // empty string to signal we are done producer.flush(); } }, "Producer" + i); } Thread consumer = new Thread( new Runnable() { @Override public void run() { boolean running = true ; int finalMessages = 0 ; while (running) { long avail; while ((avail = mux.availableToPoll()) == 0 ); // busy spin for ( int i = 0 ; i < avail; i++) { StringBuilder sb = mux.poll(); if (sb.length() == 0 ) { if (++finalMessages == NUMBER_OF_PRODUCERS) { // and we are done! running = false ; break ; } } else { System.out.println(sb.toString()); } } mux.donePolling(); } } }, "Consumer" ); consumer.start(); for ( int i = 0 ; i < producers.length; i++) producers[i].start(); consumer.join(); for ( int i = 0 ; i < producers.length; i++) producers[i].join(); } } |
Throughput Numbers
The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.
Two producers pinned to their own cores sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 613.93 millis | Min Time: 583.798 millis | Max Time: 639.488 millis | Nano Timing Cost: 15.0 nanos Average time to send 20,000,000 messages per pass in 20 passes: 613,929,765 nanos Messages per second: 32,577,016
Two producers pinned to the same core with hyper-threading sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 560.601 millis | Min Time: 535.936 millis | Max Time: 576.715 millis | Nano Timing Cost: 14.0 nanos Average time to send 20,000,000 messages per pass in 20 passes: 560,601,268 nanos Messages per second: 35,675,980
Four producers pinned to two cores with hyper-threading sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 1.061 secs | Min Time: 1.03 secs | Max Time: 1.091 secs | Nano Timing Cost: 14.0 nanos Average time to send 40,000,000 messages per pass in 20 passes: 1,060,708,245 nanos Messages per second: 37,710,652
Conclusions
With CoralQueue you can easily send message from multiple producers to a single consumer. Its throughput numbers are between 30 to 40 million messages per second.