CoralSequencer uses its own binary and garbage-free serialization framework to read and write its internal messages. For your application messages, you are free to use any serialization library or binary data model you choose. The fact that CoralSequencer is message agnostic gives you total flexibility in that decision. But you can also consider using CoralSequencer’s native serialization framework described in this article.
To define the schema of a message you can use the simple and self-explanatory IDL described below. Note that it supports optional fields through the exclamation mark following the type and, most importantly, it also supports repeating groups and repeating groups of repeating groups (i.e. nested repeating groups).
TYPE = S SUBTYPE = D reqId: int exchangeSession: chars(10) comments: varchars(1024)! securities: secId: int secSymbol: varchars(64) legs: secId: int secSymbol: varchars(64) secDesc: varchars(1024)!
To generate the Java code for the IDL above, you can do:
package com.coralblocks.coralsequencer.node; import java.io.IOException; import com.coralblocks.coralsequencer.protocol.*; import com.coralblocks.coralsequencer.protocol.field.*; public class SampleProtoMessage extends AbstractProto { private static final String TEXT = String.join("\n", "TYPE = S", "SUBTYPE = D", "", "reqId: int", "exchangeSession: chars(10)", "comments: varchars(1024)!", "", "securities:", " secId: int", " secSymbol: varchars(64)", " legs:", " secId: int", " secSymbol: varchars(64)", " secDesc: varchars(1024)!", ""); public static void main(String[] args) throws IOException { IDL idl = new IDL(TEXT); String code = idl.getCode(); String pathToJavaCode = "src/main/java/com/coralblocks/coralsequencer/node/SampleProtoMessage.java"; IDL.replaceAutoGeneratedCode(pathToJavaCode, code); } // Auto-generated code. Do not edit or change anything below here // BEGIN_AUTO_GENERATED_CODE // (Auto-generated code will go here) // END_AUTO_GENERATED_CODE }
After you run the class above, the following source code will be automatically generated and added to the Java file so that you don’t need to type anything:
public final static char TYPE = 'S'; public final static char SUBTYPE = 'D'; public final TypeField typeField = new TypeField(this, TYPE); public final SubtypeField subtypeField = new SubtypeField(this, SUBTYPE); public final IntField reqId = new IntField(this); public final CharsField exchangeSession = new CharsField(this, 10); public final VarCharsField comments = new VarCharsField(this, 1024, true); public static class SecuritiesRepeatingGroup extends RepeatingGroupField { public IntField secId; public VarCharsField secSymbol; public static class LegsRepeatingGroup extends RepeatingGroupField { public IntField secId; public VarCharsField secSymbol; public LegsRepeatingGroup(AbstractProto proto) { this(proto, new IntField(), new VarCharsField(64)); } public LegsRepeatingGroup(AbstractProto proto, ProtoField ... protoFields) { super(proto, protoFields); } @Override public GroupField nextElement() { GroupField groupField = super.nextElement(); this.secId = (IntField) groupField.internalArray()[0]; this.secSymbol = (VarCharsField) groupField.internalArray()[1]; return groupField; } @Override public GroupField iterNext() { GroupField groupField = super.iterNext(); if (groupField != null) { this.secId = (IntField) groupField.internalArray()[0]; this.secSymbol = (VarCharsField) groupField.internalArray()[1]; } return groupField; } @Override protected final RepeatingGroupField newInstance(ProtoField[] protoFields) { return new SecuritiesRepeatingGroup.LegsRepeatingGroup(null, protoFields); } } SecuritiesRepeatingGroup.LegsRepeatingGroup legs; public VarCharsField secDesc; public SecuritiesRepeatingGroup(AbstractProto proto) { this(proto, new IntField(), new VarCharsField(64), new SecuritiesRepeatingGroup.LegsRepeatingGroup(null), new VarCharsField(1024)); } public SecuritiesRepeatingGroup(AbstractProto proto, ProtoField ... protoFields) { super(proto, protoFields); } @Override public GroupField nextElement() { GroupField groupField = super.nextElement(); this.secId = (IntField) groupField.internalArray()[0]; this.secSymbol = (VarCharsField) groupField.internalArray()[1]; this.legs = (SecuritiesRepeatingGroup.LegsRepeatingGroup) groupField.internalArray()[2]; this.secDesc = (VarCharsField) groupField.internalArray()[3]; return groupField; } @Override public GroupField iterNext() { GroupField groupField = super.iterNext(); if (groupField != null) { this.secId = (IntField) groupField.internalArray()[0]; this.secSymbol = (VarCharsField) groupField.internalArray()[1]; this.legs = (SecuritiesRepeatingGroup.LegsRepeatingGroup) groupField.internalArray()[2]; this.secDesc = (VarCharsField) groupField.internalArray()[3]; } return groupField; } @Override protected final RepeatingGroupField newInstance(ProtoField[] protoFields) { return new SecuritiesRepeatingGroup(null, protoFields); } } SecuritiesRepeatingGroup securities = new SecuritiesRepeatingGroup(this);
Below a simple node to send and receive the SampleProtoMessage defined above:
package com.coralblocks.coralsequencer.node; import static com.coralblocks.corallog.Log.*; import java.nio.ByteBuffer; import com.coralblocks.coralreactor.admin.AdminCommand; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralsequencer.message.Message; import com.coralblocks.coralsequencer.mq.Node; import com.coralblocks.coralsequencer.protocol.Proto; public class SampleProtoNode extends Node { private final SampleProtoMessage sampleProtoMessage = new SampleProtoMessage(); private final SampleProtoParser protoParser = new SampleProtoParser(); private final ByteBuffer bb = ByteBuffer.allocate(2048); public SampleProtoNode(NioReactor nio, String name, Configuration config) { super(nio, name, config); } @Override protected void initAdminCommands() { super.initAdminCommands(); addAdminCommand(new AdminCommand("sendSampleProtoMessage") { @Override public boolean execute(CharSequence args, StringBuilder results) { try { sampleProtoMessage.reqId.set(23223); sampleProtoMessage.exchangeSession.set(getSession()); sampleProtoMessage.comments.markAsNotPresent(); sampleProtoMessage.securities.clear(); sampleProtoMessage.securities.nextElement(); sampleProtoMessage.securities.secId.set(1); sampleProtoMessage.securities.secSymbol.set("IBM2-IBM3"); sampleProtoMessage.securities.legs.nextElement(); sampleProtoMessage.securities.legs.secId.set(2222); sampleProtoMessage.securities.legs.secSymbol.set("IBM2"); sampleProtoMessage.securities.legs.nextElement(); sampleProtoMessage.securities.legs.secId.set(3333); sampleProtoMessage.securities.legs.secSymbol.set("IBM3"); sampleProtoMessage.securities.secDesc.set("This is a spread for IBM2-IBM3"); sampleProtoMessage.securities.nextElement(); sampleProtoMessage.securities.secId.set(2); sampleProtoMessage.securities.secSymbol.set("IBM4-IBM5"); sampleProtoMessage.securities.legs.nextElement(); sampleProtoMessage.securities.legs.secId.set(4444); sampleProtoMessage.securities.legs.secSymbol.set("IBM4"); sampleProtoMessage.securities.legs.nextElement(); sampleProtoMessage.securities.legs.secId.set(5555); sampleProtoMessage.securities.legs.secSymbol.set("IBM5"); sampleProtoMessage.securities.secDesc.set("This is a spread for IBM4-IBM5"); sendCommand(sampleProtoMessage); } catch(Exception e) { e.printStackTrace(); } results.append("SampleProtoMessage sent!"); return true; } }); } @Override protected void handleMessage(boolean isMine, Message msg) { Proto proto = protoParser.parse(msg); if (proto == null) { Error.log(name, "Could not parse message:", msg.toString()); return; } char type = proto.getType(); char subtype = proto.getSubtype(); if (type == SampleProtoMessage.TYPE && subtype == SampleProtoMessage.SUBTYPE) { SampleProtoMessage spm = (SampleProtoMessage) proto; Info.log(name, "reqId=", spm.reqId, "exchangeSession=", spm.exchangeSession, "comments=", spm.comments); // Get one by one... int reqId = spm.reqId.get(); CharSequence exchangeSession = spm.exchangeSession.get(); // Remember comments is an optional field... CharSequence comments = spm.comments.isPresent() ? spm.comments.get() : null; Info.log(name, "reqId=", reqId, "exchangeSession=", exchangeSession, "comments=", comments); // Iterator through repeating groups... spm.securities.beginIteration(); while(spm.securities.iterHasNext()) { spm.securities.iterNext(); Info.log(name, "secId=", spm.securities.secId, "secSymbol=", spm.securities.secSymbol, "secDesc=", spm.securities.secDesc); // Iterator through a repeating group inside a repeating group... spm.securities.legs.beginIteration(); while(spm.securities.legs.iterHasNext()) { spm.securities.legs.iterNext(); Info.log(name, "secId=", spm.securities.legs.secId, "secSymbol=", spm.securities.legs.secSymbol); } Info.log(name, "legs=", spm.securities.legs); } Info.log(name, "securities=", spm.securities); } else { Error.log(name, "Unreconizable proto message!", "type=", type, "subtype=", subtype); } bb.clear(); proto.writeAscii(bb); bb.flip(); Info.log(name, "Full ProtoMessage received:", bb); } }
Don’t forget to specify a parser to receive your proto messages:
public class SampleProtoParser extends AbstractMessageProtoParser { @Override protected Proto[] createProtoMessages() { return new Proto[] { new SampleProtoMessage() }; } }