CoralFIX is fully integrated with CoralReactor so you can code your own ultra-low-latency, ultra-low-variance (i.e. no GC overhead) FIX network clients and servers. In this article we introduce the FixApplicationClient
and the FixApplicationServer
that you can use to code a FIX connection that takes care of all the low-level FIX session details for you, like logon, heartbeats, sequence reset, etc.
FixApplicationClient and FixApplicationServer
By inheriting from FixApplicationClient
you can code a fully functional FIX connection that will handle all the low-level session details of the FIX protocol like sequences, logon, resend requests, etc. All you have to do is handle the incoming application messages in any way you want. See an example below:
package com.coralblocks.coralfix.sample; import static com.coralblocks.coralfix.FixTags.*; import java.util.Iterator; import com.coralblocks.coralfix.FixConstants.FixVersions; import com.coralblocks.coralfix.FixConstants.MsgTypes; import com.coralblocks.coralfix.FixGroup; import com.coralblocks.coralfix.FixGroupElement; import com.coralblocks.coralfix.FixMessage; import com.coralblocks.coralfix.FixTag; import com.coralblocks.coralfix.FixTags; import com.coralblocks.coralfix.client.FixApplicationClient; import com.coralblocks.coralreactor.client.Client; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralreactor.util.MapConfiguration; public class SimpleFixApplicationClient extends FixApplicationClient { // Add any extra / non-standard fix tag from your spec, so you can work with the name instead of the id private static FixTag MySpecialTag = FixTags.addTag("MySpecialTag", 3434); public SimpleFixApplicationClient(NioReactor nio, String localhost, int port, Configuration config) { super(nio, localhost, port, config); // Specify the repeating group you expect to parse, for each message type: addGroupTemplate(MsgTypes.NewOrderSingle, NoPartyIDs, PartyID, PartyIDSource, PartyRole); /* Explanation about addGroupTemplate above: * * MsgTypes.NewOrderSingle is the message type containing the repeating group * * NoPartyIDs is the main repeating group tag, meaning the repeating groups will * start after that tag. This tag contains the number (i.e. an integer) of repeating groups that follow. * * PartyID is the very first tag that every repeating group will have. That's important * to correctly denote the starting of a new repeating group and is enforced by the * FIX spec (i.e. you can't start a new repeating group with another tag) * * PartyIDSource, PartyRole, etc. are all the other tags (not included the very first * one above) that this repeating group can contain. They can be specified in any order * and can appear in the repeating group in any order. They are also all optional, in other words, * they do not need to appear in the FIX message BUT THEY DO HAVE TO BE SPECIFIED HERE, ALL OF THEM. */ } @Override protected void handleOpened() { // client was opened and will now try to connect to its destination // do anything you want to do here... } @Override protected void handleClosed() { // client was closed, its connection was terminated and it will now sit idle // do anything you want to do here... } @Override protected void handleConnectionEstablished() { // socket connection was established but no login handshake was done yet // do anything you want to do here... } @Override protected void handleConnectionOpened() { // the FIX session handshake through LOGON messages was performed... // the client is now ready to start sending/receiving messages // do anything you want to do here... } @Override protected void handleConnectionTerminated() { // the socket connection was broken/terminated // do anything you want to do here... } @Override protected void prepareLogon(FixMessage outFixMsg) { super.prepareLogon(outFixMsg); // add/remove any tag here... probably not necessary because the base // class is already adding all the required tags } @Override protected void handleFixApplicationMessage(FixMessage fixMsg, boolean possDupe) { // do what you have to do with this FIX application message if (fixMsg.checkType(MsgTypes.NewOrderSingle)) { double price = fixMsg.getDouble(Price); System.out.println("Read price: " + price); CharSequence clOrdID = fixMsg.getCharSequence(ClOrdID); System.out.println("Read ClOrdID: " + clOrdID); char mySpecialTag = fixMsg.getChar(MySpecialTag); System.out.println("Read MySpecialTag: " + mySpecialTag); FixGroup fixGroup = fixMsg.getGroup(NoPartyIDs); System.out.println(fixGroup.getNumberOfElements()); // you can also print the whole fix group for debugging purposes: System.out.println(fixGroup); Iterator<FixGroupElement> iter = fixGroup.iterator(); while(iter.hasNext()) { FixGroupElement elem = iter.next(); CharSequence partyID = elem.getCharSequence(PartyID); System.out.println(partyID); char partyIDSource = elem.getChar(PartyIDSource); System.out.println(partyIDSource); int partyRole = elem.getInt(PartyRole); System.out.println(partyRole); // you can also print the whole element for debugging purposes: System.out.println(elem); } } } public static void main(String[] args) { String senderComp = args.length > 0 ? args[0] : "testClient"; NioReactor nio = NioReactor.create(); MapConfiguration config = new MapConfiguration(); // REQUIRED config.add("fixVersion", FixVersions.VERSION_44); // the senderComp of the fix client // REQUIRED config.add("senderComp", senderComp); // NOT required: // config.add("targetComp", "testAcct"); // the TargetCompID tag // config.add("senderSub", "xxx"); // the SenderSubID tag // config.add("targetSub", "yyy"); // the TargetSubID tag // config.add("senderLoc", "aaa"); // the SenderLocationID tag // config.add("heartbeat", "20"); // the HeartBtInt tag (defaults to 10 seconds) // config.add("encryption", "1"); // the EncryptMethod tag (defaults to 0) // config.add("username", "myusername"); // sends the Username tag with the Logon message // config.add("password", "mypass"); // the RawData tag (the RawDataLength tag will also be included) // print all messages received and sent to STDOUT for debugging purposes // (default is false) config.add("debugMessages", true); // config.add("debugHeartbeats", true); // show heartbeats as well (default is true) // send a ResetSeqNumFlag=Y on Logon to force a sequence reset on both sides (inbound and outbound) // (default is false) config.add("forceSeqReset", true); // turn on asynchronous audit logging // (default is false) config.add("auditLog", false); // config.add("auditLogDir", "/mylogdir/"); // set the folder where to save the audit log file (default is "auditLog") // turn on asynchronous fix logging (human readable fix messages) // (default is false) config.add("fixLog", false); // config.add("fixLogDir", "/mylogdir"); // set the folder where to save the fix log file (default is "fixLog") // config.add("fixLogHeartbeats", true); // log heartbeats to the fix log (default is false) // persist sequences asynchronously to a file, so you don't lose them across restarts // (default is false) config.add("persistSequences", false); // config.add("persistDir", "mySeqFolder"); // the folder where to store the sequence files (default is current dir) // config.add("persistFilename, "mySeqFilename"); // the filename of the sequence file (default is the client name) // persists outbound fix messages asynchronously for message resends / gap fills // (default is false) config.add("supportPersistence", false); // open the admin server to accept telnet connections to monitor and control this fix client // (default is false) config.add("adminServer", false); // change the port for the admin server // (default is 45441) config.add("adminServerPort", 45441); // set heartbeat interval in seconds // (default is 10) config.add("heartbeat", 5); /* // More optional config parameters // IMPORTANT: The default values should be what you want most of the time! config.add("includeLastMsgSeqNumProcessedTag", false); // include this header tag on every fix message? (default is true) config.add("maxLogonAttempts", 10); // if Logon fail for whatever reason, retry (default is 5) config.add("secondsToReopenAfterClose", 60 * 10); // if can't logon, then client will be closed and re-open after N seconds (default is 300) config.add("sendLogoutOnClose", false); // on a client close, a logout message will be sent (default is false) config.add("sendLogonOnConnected", true); // when socket is estabilished send logon automatically (default is true) config.add("persistSequencesImmediately", false); // force the mmap file to commit the the sequences to disk immediately (default is false) config.add("sequence", 1); // the initial inbound sequence (default is 1 or whatever is persisted if persistence is on) config.add("outboundSeq", 1); // the initial outbound sequence (default is 1 or wahtever is persisted if persitence is on) config.add("acceptInboundSeqFromServer", false); // accept whatever sequence you get from server as your inbound sequence (default is false) config.add("disconnectOnGap", false); // disconnect on gap instead of sending a sequence reset / message retransmission (default is false) config.add("reconnectOnStartTimerEvent", true); // when using a start timer, reconnect if connected (default is true) config.add("resetSequencesOnStartTimerEvent", true); // when using a start timer, reset the sequences (default is true) */ /* // More optional config parameters related to the max size of FIX messages: config.add("fixMaxTags", 64); // max number of tags, not including tags from repeating groups (defaults to 64) config.add("fixMaxGroups", 8); // max number of different repeating groups a message can have (defaults to 8) config.add("fixMaxValueLength", 256); // max length of a tag value in characters (defaults to 256) config.add("fixMaxGroupElements", 64); // max number of repeating group elements (defaults to 64) config.add("fixMaxValuesPerGroupElement", 64); // max number of repeating group tags/values per group element (defaults to 64) */ Client client = new SimpleFixApplicationClient(nio, "localhost", 45451, config); client.open(); nio.start(); } }
And same thing in the server side, by inheriting from FixApplicationServer
:
package com.coralblocks.coralfix.sample; import static com.coralblocks.coralfix.FixTags.*; import com.coralblocks.coralfix.FixConstants.FixVersions; import com.coralblocks.coralfix.FixConstants.MsgTypes; import com.coralblocks.coralfix.FixGroup; import com.coralblocks.coralfix.FixMessage; import com.coralblocks.coralfix.FixTag; import com.coralblocks.coralfix.FixTags; import com.coralblocks.coralfix.server.FixApplicationServer; import com.coralblocks.coralreactor.client.Client; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralreactor.util.MapConfiguration; public class SimpleFixApplicationServer extends FixApplicationServer { // Add any extra / non-standard fix tag from your spec, so you can work with the name instead of the id private static FixTag MySpecialTag = FixTags.addTag("MySpecialTag", 3434); public SimpleFixApplicationServer(NioReactor nio, int port, Configuration config) { super(nio, port, config); } @Override protected void handleOpened() { // sever was opened and will now accept connections from clients... // do anything you want to do here... } @Override protected void handleClosed() { // server was closed, all clients were disconnected and it will not accept any connection... // do anything you want to do here... } @Override protected void handleConnectionEstablished(Client client) { // socket connection was established for a client but no login handshake was done yet // do anything you want to do here... } @Override protected void handleConnectionOpened(Client client) { // the FIX session handshake through LOGON messages was performed for this client... // the client is now ready to start sending/receiving messages // do anything you want to do here... } @Override protected void handleConnectionTerminated(Client client) { // the socket connection was broken/terminated for this client // do anything you want to do here... } @Override protected void handleFixApplicationMessage(Client client, FixMessage fixMsg, boolean possDupe) { // do whatever you want to do with the application message received from this client... FixMessage outFixMsg = getOutFixMessage(client, MsgTypes.NewOrderSingle); // Add the tags you want to outFixMsg // NOTE: there is no need to add any session-level tags like MsgSeqNum, SenderCompID, etc. outFixMsg.add(Price, 50.34); outFixMsg.addTimestamp(TransactTime, System.currentTimeMillis()); outFixMsg.add(ClOrdID, "A123"); outFixMsg.add(MySpecialTag, 'Y'); // Add a repeating group FixGroup partyIds = outFixMsg.createGroup(NoPartyIDs); partyIds.nextElement().add(PartyID, "BLAH").add(PartyIDSource, 'D').add(PartyRole, 54); partyIds.nextElement().add(PartyID, "TRD").add(PartyIDSource, 'D').add(PartyRole, 36); partyIds.nextElement().add(PartyID, "FOO").add(PartyIDSource, 'D').add(PartyRole, 7); // Send out the message to the client send(client, outFixMsg); } public static void main(String[] args) { NioReactor nio = NioReactor.create(); MapConfiguration config = new MapConfiguration(); // REQUIRED config.add("fixVersion", FixVersions.VERSION_44); // print all messages received and sent to STDOUT for debugging purposes // (default is false) config.add("debugMessages", true); // accept as the client inbound sequence whatever sequence I receive in the first message coming from the client // (default is false) config.add("acceptInboundSeqFromClient", false); // turn on asynchronous audit logging // (default is false) config.add("auditLog", false); // turn on asynchronous fix logging (human readable fix messages) // (default is false) config.add("fixLog", false); // persist all sequences from all clients (senderCompIDs) to disk // (default is false) config.add("persistSequences", false); // support persistence of outbound fix messages for message resends / gap fills // (default is false) config.add("supportPersistence", false); // enable the telnet admin server // (default is false) config.add("adminServer", false); // change the port for the admin server // (default is 45442) config.add("adminServerPort", 45442); FixApplicationServer server = new SimpleFixApplicationServer(nio, 45451, config); server.open(); nio.start(); } }
Conclusion
CoralFIX is fully integrated with CoralReactor so you can easily code your own FIX clients and servers. Furthermore, it takes care of all the low-level session details of the FIX protocol for you, so all you have to do is handle messages in the application level.