Blazing Fast Throughput with CoralFIX + CoralReactor

To test the performance of CoralFIX + CoralReactor we have developed a simple test with a fix server and a fix client. The client connects to the server, the standard FIX handshake through LOGON messages takes place and the client proceeds to send (i.e. push) 5 million FIX messages to the server as fast as it can. Then the server receives and processes all the messages calculating the throughput. So we are are measuring the one-way throughput over loopback. For a latency benchmark instead you can check this article. Below we present the throughput results and the source code.

NOTE: If you want to check the benchmark of just the CoralFIX parser without any network I/O code (i.e. without CoralReactor) you can check this article.

The Results

Messages: 5,000,000 (one-way)
Throughput: 279,314 msgs/sec

The machine used for the benchmarks was an Intel i7 (4 x 3.5GHz) Ubuntu box overclocked to 4.5GHz.

The Source Code

The server:

package com.coralblocks.coralfix.bench.throughput;

import com.coralblocks.coralfix.FixMessage;
import com.coralblocks.coralfix.server.FixApplicationServer;
import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.server.Server;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;

public class FixThroughputServer extends FixApplicationServer {
	
	// java -Xms6g -Xmx8g -server -verbose:gc -Xbootclasspath/p:../CoralReactor-boot-jdk7/target/coralreactor-boot-jdk7.jar -cp target/coralfix-all.jar:lib/jna-3.5.1.jar -DpersistSequences=false -DnioReactorProcToBind=2 com.coralblocks.coralfix.bench.throughput.FixThroughputServer
		
	private long start;
	private long msgCount;

	public FixThroughputServer(NioReactor nio, int port, Configuration config) {
	    super(nio, port, config);
    }

	@Override
    protected void handleFixApplicationMessage(Client client, FixMessage fixMsg, boolean possDupe) {
		msgCount++;
    }
	
	@Override
	protected void handleConnectionEstablished(Client client) {
		msgCount = 0;
	}
	
	@Override
	protected void handleConnectionOpened(Client client) {
		start = System.nanoTime();
	}
	
	@Override
	protected void handleConnectionTerminated(Client client) {
		if (msgCount > 0) {
    		long totalTime = System.nanoTime() - start;
    		long latency = totalTime / msgCount;
    		long ops = msgCount * 1000000000L / totalTime;
    		System.out.println("Done receiving messages! messagesReceived=" + msgCount 
		+ " avgLatencyPerMsg=" + latency + " nanos throughput=" + ops + " msgs/sec");
		}
	}
	
	public static void main(String[] args) {
		NioReactor nio = NioReactor.create();
		MapConfiguration config = new MapConfiguration();
		config.add("fixVersion", 44);
		Server server = new FixThroughputServer(nio, 45451, config);
		server.open();
		nio.start();
	}
}




The client:

package com.coralblocks.coralfix.bench.throughput;

import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;

import com.coralblocks.coralbits.util.SystemUtils;
import com.coralblocks.coralfix.FixConstants;
import com.coralblocks.coralfix.FixMessage;
import com.coralblocks.coralfix.FixTags;
import com.coralblocks.coralfix.client.FixApplicationClient;
import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;

public class FixThroughputClient extends FixApplicationClient {
	
	// java -Xms6g -Xmx8g -server -verbose:gc -Xbootclasspath/p:../CoralReactor-boot-jdk7/target/coralreactor-boot-jdk7.jar -cp target/coralfix-all.jar:lib/jna-3.5.1.jar -DpersistSequences=false -DnioReactorProcToBind=3 com.coralblocks.coralfix.bench.throughput.FixThroughputClient
	
	private final int messagesToSend;
	private final int batchSize;
	
	private int msgCount;
	private long start;

	public FixThroughputClient(NioReactor nio, String host, int port, Configuration config) {
	    super(nio, host, port, config);
	    this.messagesToSend = config.getInt("messagesToSend");
	    this.batchSize = config.getInt("batchSize");
    }
	
	@Override
	protected void handleConnectionEstablished() {
		msgCount = 0;
	}

	@Override
	protected void handleConnectionOpened() {
		start = System.nanoTime();
		addInterestOp(SelectionKey.OP_WRITE);
	}
	
	@Override
	public void onWrite(WritableByteChannel channel) {
		
		for(int i = 0; i < batchSize; i++) {
		
    		if (msgCount < messagesToSend) {
    			
    			FixMessage fixOutMsg = getOutFixMessage(FixConstants.MsgTypes.ExecutionReport);
    			fixOutMsg.addTimestamp(FixTags.TransactTime, nio.currentTimeMillis());
    			fixOutMsg.add(FixTags.Account, "01234567");
    			fixOutMsg.add(FixTags.OrderQty, 50);
    			fixOutMsg.add(FixTags.Price, 400.5);
    			fixOutMsg.add(FixTags.ClOrdID, "4");
    			fixOutMsg.add(FixTags.HandlInst, '1');
    			fixOutMsg.add(FixTags.OrdType, '2');
    			fixOutMsg.add(FixTags.Side, '1');
    			fixOutMsg.add(FixTags.Symbol, "OC");
    			fixOutMsg.add(FixTags.Text, "NIGEL");
    			fixOutMsg.add(FixTags.TimeInForce, '0');
    			fixOutMsg.add(FixTags.SecurityDesc, "AOZ3 C02000");
    			fixOutMsg.add(FixTags.SecurityType, 'O');
    		
    			if (!send(fixOutMsg)) {
    				// underlying kernel write socket buffer is full
    				// sending too fast and/or the other side is lagging
    				return; // abort and wait for OP_WRITE
    			}
    			msgCount++;
    		} else {
    			if (hasPendingDataToFlush()) {
    				flush();
    			} else {
    				printResults();
    				disconnect();
    				return;
    			}
    		}
		}
	}
	
	private void printResults() {
		long totalTime = System.nanoTime() - start;
		long latency = totalTime / msgCount;
		long ops = msgCount * 1000000000L / totalTime;
		System.out.println("Done sending messages! messagesSent=" + msgCount 
		+ " avgLatencyPerMsg=" + latency + " nanos throughput=" + ops + " msgs/sec");
	}
	
	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();
		
		int messagesToSend = SystemUtils.getInt("messagesToSend", 5000000);
		int batchSize = SystemUtils.getInt("batchSize", 128);
		
		MapConfiguration config = new MapConfiguration();
		config.add("fixVersion", 44);
		config.add("senderComp", "testSenderCompID");
		config.add("messagesToSend", messagesToSend);
		config.add("batchSize", batchSize);
		
		Client client = new FixThroughputClient(nio, "localhost", 45451, config);
		client.open();
		nio.start();
	}
}


Conclusion

CoralFIX + CoralReactor can easily process more than 250,000 FIX messages per second.