package com.coralblocks.coralqueue.bench; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; import com.coralblocks.coralqueue.waitstrategy.WaitStrategy; import com.coralblocks.coralqueue.waitstrategy.WaitStrategyFactory; import com.coralblocks.coralthreads.Affinity; import com.coralblocks.coralutils.SystemUtils; import com.coralblocks.coralutils.bench.Benchmarker; /** * Different cores: (no hyper-threading) * * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=3 -DexcludeNanoTimeCost=true -DqueueCapacity=1024 -Dtimestamper=nanos com.coralblocks.coralqueue.bench.TestMessageThroughput * * Results: Iterations: 20 | Avg Time: 234.781 millis | Min Time: 233.626 millis | Max Time: 236.014 millis | Nano Timing Cost: 14.0 nanos * Average time to send 10,000,000 messages: 234,781,092 nanos * Messages per second: 42,592,867 * * Same core: (with hyper-threaading) * * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=6 -DexcludeNanoTimeCost=true -DqueueCapacity=1024 -Dtimestamper=nanos com.coralblocks.coralqueue.bench.TestMessageThroughput * * Results: Iterations: 20 | Avg Time: 117.343 millis | Min Time: 117.273 millis | Max Time: 117.435 millis | Nano Timing Cost: 14.0 nanos * Average time to send 10,000,000 messages: 117,342,956 nanos * Messages per second: 85,220,283 */ public class TestMessageThroughput { public static void main(String[] args) throws InterruptedException { final int messagesToTest = SystemUtils.getInt("messagesToTest", 10000000); final int capacity = SystemUtils.getInt("queueCapacity", 1024); final int passes = SystemUtils.getInt("passes", 20); final int warmupPasses = SystemUtils.getInt("warmupPasses", 2); int prodProcToBind = SystemUtils.getInt("producerProcToBind", 2); int consProcToBind = SystemUtils.getInt("consumerProcToBind", 3); final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(capacity, MutableLong.BUILDER); final Benchmarker bench = Benchmarker.create(warmupPasses); final WaitStrategy waitStrategy = WaitStrategyFactory.getWaitStrategy("park-backoff"); final Queue<MutableLong> backQueue = new AtomicQueue<MutableLong>(capacity, MutableLong.BUILDER); System.out.println("messagesToTest: " + messagesToTest); System.out.println("queueCapacity: " + capacity); System.out.println("timestamper: " + bench.getTimestamper()); Thread producer = new Thread(new Runnable() { @Override public void run() { Affinity.bind(); MutableLong ml = null; for(int i = 0; i < passes + warmupPasses; i++) { // one for warmup long count = 0; bench.mark(); while(count < messagesToTest) { while((ml = queue.nextToDispatch()) == null); ml.set(count); queue.flush(true); count++; } while(backQueue.availableToPoll() != 1) { waitStrategy.block(); } backQueue.poll().get(); backQueue.donePolling(false); waitStrategy.reset(); } while((ml = queue.nextToDispatch()) == null); ml.set(-1); queue.flush(true); Affinity.unbind(); System.out.println("producer exiting..."); } }, "Producer"); Thread consumer = new Thread(new Runnable() { @Override public void run() { Affinity.bind(); boolean running = true; int pass = 0; while (running) { long avail = queue.availableToPoll(); if (avail > 0) { for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); long x = ml.get(); if (x == -1) running = false; else if (x == messagesToTest - 1) { long t = bench.measure(); System.out.println("Pass " + pass + "... " + (pass < warmupPasses ? "(warmup)" : "(" + Benchmarker.convertNanoTime(t) + ")")); pass++; // notify: while((ml = backQueue.nextToDispatch()) == null); ml.set(0); backQueue.flush(false); } } queue.donePolling(false); // no lazy, tell producer immediately in case queue was full... } } Affinity.unbind(); System.out.println("consumer exiting..."); } }, "Consumer"); if (Affinity.isAvailable()) { Affinity.assignToProcessor(prodProcToBind, producer); Affinity.assignToProcessor(consProcToBind, consumer); } else { System.err.println("Thread affinity not available!"); } consumer.start(); producer.start(); consumer.join(); producer.join(); long time = Math.round(bench.getAverage()); long mps = messagesToTest * 1000000000L / time; System.out.println("Results: " + bench.results()); System.out.println("Average time to send " + messagesToTest + " messages: " + time + " nanos"); System.out.println("Messages per second: " + mps); } }