Below the code for the http client performing the stress test on the CoralReactor and Vert.x http servers. You can refer to this article for more details.
package com.coralblocks.coralreactor.client.bench.client; import java.nio.ByteBuffer; import com.coralblocks.coralbits.bench.Benchmarker; import com.coralblocks.coralreactor.client.Client; import com.coralblocks.coralreactor.client.http.HttpClient; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralreactor.util.MapConfiguration; public class StressHttpClient extends HttpClient { public static interface StressHttpClientListener { public void onConnectionOpened(Client client); public void onMessageReceived(Client client, long latency); } private int totalReceived; private StressHttpClientListener listener = null; private long start; private final int totalToReceive; public StressHttpClient(NioReactor nio, String host, int port, Configuration config) { super(nio, host, port, config); this.totalToReceive = config.getInt("totalToReceive"); } public void setListener(StressHttpClientListener listener) { this.listener = listener; } @Override protected void handleConnectionOpened() { totalReceived = 0; if (listener != null) listener.onConnectionOpened(this); } public void sendRequest() { start = System.nanoTime(); sendRequest("/plaintext"); } @Override protected void handleMessage(ByteBuffer msg) { long latency = System.nanoTime() - start; totalReceived++; if (listener != null) listener.onMessageReceived(this, latency); if (totalReceived < totalToReceive) { sendRequest(); } } public static void main(String[] args) throws Exception { NioReactor nio = NioReactor.create(); String host = args[0]; int port = Integer.parseInt(args[1]); final int totalToReceive = Integer.parseInt(args[2]); final int warmup = Integer.parseInt(args[3]); final int connections = Integer.parseInt(args[4]); final StressHttpClient[] shc = new StressHttpClient[connections]; StressHttpClientListener listener = new StressHttpClientListener() { private final Benchmarker bench = Benchmarker.create(warmup, true); private int totalRequests = 0; private int totalConnections = 0; private long start; @Override public void onConnectionOpened(Client client) { if (++totalConnections % 1000 == 0) System.out.println("Total connections: " + totalConnections); if (totalConnections == connections) { for(StressHttpClient s : shc) { s.sendRequest(); } } } @Override public void onMessageReceived(Client client, long latency) { bench.measure(latency); totalRequests++; if (totalRequests % 10000 == 0) System.out.println("Total requests: " + totalRequests); if (totalRequests == warmup) start = System.nanoTime(); // don't take warmup into account for throughput if (totalRequests == totalToReceive * connections) { long totalTime = System.nanoTime() - start; long totalResponses = totalToReceive * connections - warmup; // don't take warmup into account for throughput System.out.println("Total time to receive " + totalResponses + " responses with " + connections + " clients: " + Benchmarker.convertNanoTime(totalTime)); double mps = totalResponses / ((double) totalTime / 1000000000L); System.out.println("Messages per second: " + mps); bench.printResults(); } } }; MapConfiguration config = new MapConfiguration(); config.add("totalToReceive", totalToReceive); config.add("connectTimeout", -1); // disable for(int i = 0; i < shc.length; i++) { shc[i] = new StressHttpClient(nio, host, port, config); shc[i].setListener(listener); shc[i].open(); } nio.start(); } }