In this article we analyze how CoralQueue implements a multiplexer to allow multiple producers to send messages to a single consumer. We then present the throughput numbers for different set of configurations with different set of cpu cores.
The Multiplexer
Below an example of using the AtomicMux:
package com.coralblocks.coralqueue.sample.mux;
import com.coralblocks.coralbits.util.Builder;
import com.coralblocks.coralqueue.mux.AtomicMux;
import com.coralblocks.coralqueue.mux.Mux;
import com.coralblocks.coralqueue.mux.Producer;
public class SampleWithProducer {
private static final int NUMBER_OF_PRODUCERS = 4;
public static void main(String[] args) throws InterruptedException {
Builder<StringBuilder> builder = new Builder<StringBuilder>() {
@Override
public StringBuilder newInstance() {
return new StringBuilder(1024);
}
};
final Mux<StringBuilder> mux = new AtomicMux<StringBuilder>(1024, builder, NUMBER_OF_PRODUCERS);
Thread[] producers = new Thread[NUMBER_OF_PRODUCERS];
for(int i = 0; i < producers.length; i++) {
producers[i] = new Thread(new Runnable() {
private final Producer<StringBuilder> producer = mux.nextProducer();
@Override
public void run() {
StringBuilder sb;
for(int j = 0; j < 4; j++) {
while((sb = producer.nextToDispatch()) == null); // busy spin
sb.setLength(0);
sb.append("message ").append(j).append(" from producer ").append(producer.getIndex());
producer.flush();
}
// send final message
while((sb = producer.nextToDispatch()) == null); // busy spin
sb.setLength(0); // empty string to signal we are done
producer.flush();
}
}, "Producer" + i);
}
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
boolean running = true;
int finalMessages = 0;
while(running) {
long avail;
while((avail = mux.availableToPoll()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
StringBuilder sb = mux.poll();
if (sb.length() == 0) {
if (++finalMessages == NUMBER_OF_PRODUCERS) {
// and we are done!
running = false;
break;
}
} else {
System.out.println(sb.toString());
}
}
mux.donePolling();
}
}
}, "Consumer");
consumer.start();
for(int i = 0; i < producers.length; i++) producers[i].start();
consumer.join();
for(int i = 0; i < producers.length; i++) producers[i].join();
}
}
Throughput Numbers
The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.
Two producers pinned to their own cores sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 613.93 millis | Min Time: 583.798 millis | Max Time: 639.488 millis | Nano Timing Cost: 15.0 nanos Average time to send 20,000,000 messages per pass in 20 passes: 613,929,765 nanos Messages per second: 32,577,016
Two producers pinned to the same core with hyper-threading sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 560.601 millis | Min Time: 535.936 millis | Max Time: 576.715 millis | Nano Timing Cost: 14.0 nanos Average time to send 20,000,000 messages per pass in 20 passes: 560,601,268 nanos Messages per second: 35,675,980
Four producers pinned to two cores with hyper-threading sending messages to one consumer pinned to its own core:
Results: Iterations: 20 | Avg Time: 1.061 secs | Min Time: 1.03 secs | Max Time: 1.091 secs | Nano Timing Cost: 14.0 nanos Average time to send 40,000,000 messages per pass in 20 passes: 1,060,708,245 nanos Messages per second: 37,710,652
Conclusions
With CoralQueue you can easily send message from multiple producers to a single consumer. Its throughput numbers are between 30 to 40 million messages per second.