In this article we show how to use CoralQueue to multicast/broadcast the same message to multiple consumers so each consumer receives and processes all messages. We also present throughput numbers for different configurations, each one using a different set of cpu cores.
The Splitter
Below an example of how to use the Splitter
:
package com.coralblocks.coralqueue.sample.splitter; import com.coralblocks.coralqueue.splitter.AtomicSplitter; import com.coralblocks.coralqueue.splitter.Consumer; import com.coralblocks.coralqueue.splitter.Splitter; public class SampleWithConsumer { private static final int NUMBER_OF_CONSUMERS = 4; public static void main(String[] args) throws InterruptedException { final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS); Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS]; for(int i = 0; i < consumers.length; i++) { consumers[i] = new Thread("Consumer-" + i) { private final Consumer<StringBuilder> consumer = splitter.nextConsumer(); @Override public void run() { boolean running = true; while(running) { long avail; while((avail = consumer.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { StringBuilder sb = consumer.poll(); if (sb == null) break; // mandatory for splitters! if (sb.length() == 0) { running = false; break; // exit immediately... } System.out.println("got " + sb.toString() + " at consumer " + consumer.getIndex()); } consumer.donePolling(); } } }; consumers[i].start(); } StringBuilder sb; for(int i = 0; i < 3; i++) { while((sb = splitter.nextToDispatch()) == null); // busy spin sb.setLength(0); sb.append("message ").append(i); splitter.flush(); } // send a message to stop consumers... while((sb = splitter.nextToDispatch()) == null); // busy spin sb.setLength(0); splitter.flush(); for(int i = 0; i < consumers.length; i++) consumers[i].join(); } }
Throughput Numbers
The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.
One producer pinned to its own core sending to two consumers, each pinned to its own core:
Results: Iterations: 20 | Avg Time: 206.235 millis | Min Time: 203.929 millis | Max Time: 207.908 millis | Nano Timing Cost: 14.0 nanos Average time to send 10,000,000 messages per pass in 20 passes: 206,235,222 nanos Messages per second: 48,488,322
One producer pinned to its own core sending to two consumers, each pinned to the same core through hyper-threading:
Results: Iterations: 20 | Avg Time: 217.354 millis | Min Time: 216.239 millis | Max Time: 218.286 millis | Nano Timing Cost: 14.0 nanos Average time to send 10,000,000 messages per pass in 20 passes: 217,353,789 nanos Messages per second: 46,007,939
One producer pinned to its own core sending to four consumers each two using one core through hyper-threading:
Results: Iterations: 20 | Avg Time: 225.742 millis | Min Time: 224.252 millis | Max Time: 228.717 millis | Nano Timing Cost: 14.0 nanos Average time to send 10,000,000 messages per pass in 20 passes: 225,742,165 nanos Messages per second: 44,298,325
Conclusions
CoralQueue can multicast messages to a set of consumers at an approximate rate of 45 million messages per second. It is very simple to use the Splitter
to broadcast messages from a single producer to multiple consumers.