CoralQueue Throughput Test Explained

In this article we will present the benchmark test used by CoralQueue that shows a throughput between 55 and 65 million messages per second without hyper-threading and between 85 and 95 million messages per second with hyper-threading. If you are interested in the CoralQueue Getting Started article, you can check it here first.

Test Mechanics

To calculate throughput we run 20 different passes of 10 million messages each. Then the average time of these 20 passes are calculated to reach the average ops (operations per second) number. Note that a pass only ends after the consumer has received all messages sent by the producer. In order to receive feedback from the consumer, the producer uses an AtomicInteger so it can be notified when the consumer has processed all the messages. Then the producer proceeds to the next pass.

The test flow is described below:

  • The producer sends 10 million messages to the consumer through the queue. Once it is done sending, it blocks on the AtomicInteger waiting for an acknowledgment from the consumer that it has received and processed all the messages.
  • The producer proceeds to the next pass and the cycle repeats.
  • Once the producer has executed 20 passes it sends a final message to the consumer to signal that we are done and the consumer can now die.
  • The results are then presented.
  • Note that we ignore the first 4 passes as warmup passes.
  • Note that the pass total time is calculated in the consumer side when it receives and processes the last message of the pass.

Test Source Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.coralblocks.coralqueue.bench;
 
import java.util.concurrent.atomic.AtomicInteger;
 
import com.coralblocks.coralbits.MutableLong;
import com.coralblocks.coralbits.bench.Benchmarker;
import com.coralblocks.coralbits.util.SystemUtils;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.waitstrategy.ParkWaitStrategy;
import com.coralblocks.coralqueue.waitstrategy.WaitStrategy;
import com.coralblocks.coralthreads.Affinity;
 
public class Throughput {
     
    public static void main(String[] args) throws InterruptedException {
         
        final int messagesToSend = 10000000;
        final int queueSize = 1024;
        final int warmupPasses = 4;
        final int passes = 20;
         
        final int prodProcToBind = SystemUtils.getInt("producerProcToBind", -1);
        final int consProcToBind = SystemUtils.getInt("consumerProcToBind", -1);
        final boolean flushLazySet = SystemUtils.getBoolean("flushLazySet", true);
        final boolean donePollingLazySet = SystemUtils.getBoolean("donePollingLazySet", false);
         
        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(queueSize, MutableLong.class);
         
        final AtomicInteger countdown = new AtomicInteger();
         
        final WaitStrategy waitStrategy = new ParkWaitStrategy(true); // true => back off
         
        final Benchmarker bench = Benchmarker.create(warmupPasses);    
         
        Thread producer = new Thread(new Runnable() {
             
            @Override
            public void run() {
                 
                Affinity.bind();
                 
                MutableLong ml = null;
                 
                for(int i = 0; i < passes + warmupPasses; i++) {
 
                    long count = 0;
                     
                    countdown.set(1);
                     
                    bench.mark();
                 
                    while(count < messagesToSend) {
                        while((ml = queue.nextToDispatch()) == null); // busy spin
                        ml.set(count++);
                        // we are not batching here so flush() is called many times...
                        // therefore it is much better to use lazySet here...
                        // change it to false and you will see the difference
                        queue.flush(flushLazySet);
                    }
                     
                    while(countdown.get() != 0) { // wait for consumer to finish...
                        waitStrategy.block();
                    }
                     
                    waitStrategy.reset();
                }
                 
                // send the very last message signaling that we are done!
                while((ml = queue.nextToDispatch()) == null);
                ml.set(-1);
                queue.flush();
                 
                Affinity.unbind();
                 
                System.out.println("producer exiting...");             
            }
        }, "Producer");
         
        Thread consumer = new Thread(new Runnable() {
 
            @Override
            public void run() {
                 
                Affinity.bind();
                 
                boolean running = true;
                 
                int pass = 0;
                 
                while (running) {
                     
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        long x = ml.get();
                        if (x == -1) {
                            // the last message sent by the producer to indicate that we should die
                            running = false;
                        } else if (x == messagesToSend - 1) {
                            // the last message of a pass... print some results and notify the producer...
                            long t = bench.measure();
                            System.out.println("Pass " + pass + "... " + (pass < warmupPasses ? "(warmup)" : "(" + Benchmarker.convertNanoTime(t) + ")"));
                            pass++;
 
                            countdown.decrementAndGet(); // let the producer know!
                        }
                    }
                    // we are batching in the consumer side so let the producer
                    // know asap that it can send more messages to the queue
                    // therefore we do NOT use lazySet here...
                    // using lazySet here decreases throughput but not much
                    // change it to see the difference
                    queue.donePolling(donePollingLazySet);
                }
                 
                Affinity.unbind();
                 
                System.out.println("consumer exiting...");
            }
        }, "Consumer");
         
        if (Affinity.isAvailable()) {
            Affinity.assignToProcessor(prodProcToBind, producer);
            Affinity.assignToProcessor(consProcToBind, consumer);
        } else {
            System.err.println("Thread affinity not available!");
        }
         
        consumer.start();
        producer.start();
         
        consumer.join();
        producer.join();
         
        long time = Math.round(bench.getAverage());
         
        long mps = messagesToSend * 1000000000L / time;
         
        System.out.println("Results: " + bench.results());
        System.out.println("Average time to send " + messagesToSend + " messages per pass in " + passes + " passes: " + time + " nanos");
        System.out.println("Messages per second: " + mps);
    }
}

Test Results

The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.

Results without hyper-threading:

$ java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DproducerProcToBind=2 -DconsumerProcToBind=3 -DexcludeNanoTimeCost=true  com.coralblocks.coralqueue.bench.Throughput
Pass 0... (warmup)
Pass 1... (warmup)
Pass 2... (warmup)
Pass 3... (warmup)
Pass 4... (168.334 millis)
Pass 5... (168.546 millis)
Pass 6... (165.904 millis)
Pass 7... (168.469 millis)
Pass 8... (158.65 millis)
Pass 9... (166.946 millis)
Pass 10... (168.114 millis)
Pass 11... (160.557 millis)
Pass 12... (163.021 millis)
Pass 13... (168.204 millis)
Pass 14... (164.229 millis)
Pass 15... (168.085 millis)
Pass 16... (164.91 millis)
Pass 17... (165.532 millis)
Pass 18... (166.758 millis)
Pass 19... (164.743 millis)
Pass 20... (163.74 millis)
Pass 21... (164.291 millis)
Pass 22... (165.269 millis)
Pass 23... (158.166 millis)
producer exiting...
consumer exiting...
Results: Iterations: 20 | Avg Time: 165.123 millis | Min Time: 158.166 millis | Max Time: 168.546 millis | Nano Timing Cost: 16.0 nanos
Average time to send 10000000 messages per pass in 20 passes: 165123448 nanos
Messages per second: 60,560,750

Results with hyper-threading:

$ java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DproducerProcToBind=2 -DconsumerProcToBind=6 -DexcludeNanoTimeCost=true   com.coralblocks.coralqueue.bench.Throughput
Pass 0... (warmup)
Pass 1... (warmup)
Pass 2... (warmup)
Pass 3... (warmup)
Pass 4... (110.678 millis)
Pass 5... (110.653 millis)
Pass 6... (110.82 millis)
Pass 7... (110.67 millis)
Pass 8... (110.612 millis)
Pass 9... (110.648 millis)
Pass 10... (110.668 millis)
Pass 11... (110.727 millis)
Pass 12... (110.643 millis)
Pass 13... (110.69 millis)
Pass 14... (110.594 millis)
Pass 15... (110.654 millis)
Pass 16... (110.672 millis)
Pass 17... (110.776 millis)
Pass 18... (110.647 millis)
Pass 19... (110.724 millis)
Pass 20... (110.764 millis)
Pass 21... (110.677 millis)
Pass 22... (110.753 millis)
Pass 23... (110.645 millis)
producer exiting...
consumer exiting...
Results: Iterations: 20 | Avg Time: 110.686 millis | Min Time: 110.594 millis | Max Time: 110.82 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10000000 messages per pass in 20 passes: 110685691 nanos
Messages per second: 90,345,914

Conclusion

CoralQueue can send up to 65 million messages per second without hyper-threading and up to 95 million messages per second with hyper-threading.



Article
Name
CoralQueue Throughput Test Explained
Company
Summary
In this article we will present the benchmark test used by CoralQueue that shows a throughput between 55 and 65 million messages per second without hyper-threading and between 85 and 95 million messages per second with hyper-threading.