Below an example of how you can implement a CoralSequencer node using inheritance and composition so that you can send and receive messages to and from the sequencer. Messages sent to the sequencer will appear in the event stream (i.e. message bus).
Inheritance:
package com.coralblocks.coralsequencer.node; import static com.coralblocks.corallog.Log.*; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralsequencer.message.Message; import com.coralblocks.coralsequencer.mq.Node; // Here we use INHERITANCE to extend a Node. For COMPOSITION see the other example further below... public class SimpleNode extends Node { public SimpleNode(NioReactor nio, String name, Configuration config) { super(nio, name, config); } @Override protected void handleOpened() { // when a node is opened, it will try to connect to the sequencer, to receive messages from the event stream Sysout.log(name, "Node has been opened!", "sequence=", getNextExpectedSequence()); } @Override protected void handleClosed() { // when a node is closed, it will disconnect from the sequencer, and it will not receive any more messages from the event stream Sysout.log(name, "Node has been closed!", "sequence=", getNextExpectedSequence()); } @Override protected void handleConnectionEstablished() { // the node was able to establish a connection to the sequencer Sysout.log(name, "Node has established a connection with the sequencer!", "sequence=", getNextExpectedSequence()); } @Override protected void handleConnectionOpened() { // the node has started to receive *live* messages from the sequencer (after rewinding or after a disconnect followed by a gap fill) Sysout.log(name, "Node is receiving live messages from the sequencer!", "sequence=", getNextExpectedSequence()); } @Override protected void handleRewindingStarted() { // node is connected and will start to rewind Sysout.log(name, "Node is starting to rewind!", "sequence=", getNextExpectedSequence()); } @Override protected void handleRewinded() { // the node has finished rewinding all past messages from the current session // note that this is called only once in the lifecycle of a node, because a node rewinds only once, when its open() method is called Sysout.log(name, "Node has finished rewinding!", "sequence=", getNextExpectedSequence()); } @Override protected void handleGapFillingStarted(long seq, int count) { Sysout.log(name, "Node lost messages and is gap filling!", "sequence=", getNextExpectedSequence(), "seqRequested=", seq, "count=", count); } @Override protected void handleGapFillingEnded() { Sysout.log(name, "Node has finished gap filling!", "sequence=", getNextExpectedSequence()); } @Override protected void handleConnectionTerminated() { // the node has lost connection to the sequencer Sysout.log(name, "Node has lost connection with the sequencer!", "sequence=", getNextExpectedSequence()); } @Override protected void handleSessionStarted(String sessionName) { Sysout.log(name, "Sequencer session has started!", "sequence=", getNextExpectedSequence(), "session=", sessionName); } @Override protected void handleSessionEnded(String sessionName) { Sysout.log(name, "Sequencer session has ended!", "sequence=", getNextExpectedSequence(), "session=", sessionName); } @Override protected void handleActivated() { // when activated, the node is able to send commands to the sequencer Sysout.log(name, "Node has been activated!", "accountSeq=", getAccountSequence(), "sequence=", getNextExpectedSequence()); } @Override protected void handleDeactivated() { // when deactivated, the node will not send any commands to the sequencer Sysout.log(name, "Node has been deactivated!", "sequence=", getNextExpectedSequence()); } @Override protected void handleBatchProcessed(int numberOfMessagesProcessed) { // node processes messages from the event stream in batches Sysout.log(name, "Node has processed a batch of messages from the sequencer!", "sequence=", getNextExpectedSequence(), "messages=", numberOfMessagesProcessed); } @Override public void handleAcked(Command cmd, Message msg) { // the given command was acked by the given msg Sysout.log(name, "Command acked by message!", "cmd=", cmd.toCharSequence(), "msg=", msg.toCharSequence()); } @Override protected void handleMessage(boolean isMine, Message msg) { Sysout.log(name, "Got message!", "now=", currentSequencerTime(), "isRewinding=", isRewinding(), "session=", msg.getSession(), "msgTimestamp=", msg.getTimestamp(), "from=", msg.getAccountName(), "isMine=", isMine, "seq=", msg.getSequence(), "dataSize=", msg.getData().remaining(), "data=", msg.getData() ); } }
Composition:
package com.coralblocks.coralsequencer.node; import static com.coralblocks.corallog.Log.*; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralsequencer.VM; import com.coralblocks.coralsequencer.command.Command; import com.coralblocks.coralsequencer.message.Message; import com.coralblocks.coralsequencer.mq.Node; import com.coralblocks.coralsequencer.mq.NodeComposition; import com.coralblocks.coralsequencer.mq.NodeListener; public class SimpleNodeComposition implements NodeListener, NodeComposition { private final Node node; // You can also define the standard constructor below so you can start this node from a mq file if you want to... // In order to do that you must also make the class implement NodeComposition... public SimpleNodeComposition(NioReactor nio, String name, Configuration config) { this.node = new Node(nio, name, config); this.node.addListener(this); } public SimpleNodeComposition(String name) { this.node = VM.getInstance().newNode(name, "noRewind=true"); this.node.addListener(this); } @Override public Node getNode() { return node; } public static void main(String[] args) { int telnetIndex = -1; if (args.length > 0) telnetIndex = Integer.parseInt(args[0]); VM vm = VM.getInstance(); if (telnetIndex != -1) vm.setTelnetIndex(telnetIndex); vm.start(); SimpleNodeComposition simple = new SimpleNodeComposition("NODE6"); simple.getNode().open(); simple.getNode().activate(); } @Override public void onOpened(Node node) { // when a node is opened, it will try to connect to the sequencer, to receive messages from the event stream Sysout.log(node, "Node has been opened!", "sequence=", node.getNextExpectedSequence()); } @Override public void onClosed(Node node) { // when a node is closed, it will disconnect from the sequencer, and it will not receive any more messages from the event stream Sysout.log(node, "Node has been closed!", "sequence=", node.getNextExpectedSequence()); } @Override public void onConnectionEstablished(Node node) { // the node was able to establish a connection to the sequencer Sysout.log(node, "Node has established a connection with the sequencer!", "sequence=", node.getNextExpectedSequence()); } @Override public void onConnectionOpened(Node node) { // the node has started to receive *live* messages from the sequencer (after rewinding or after a disconnect followed by a gap fill) Sysout.log(node, "Node is receiving live messages from the sequencer!", "sequence=", node.getNextExpectedSequence()); } @Override public void onRewindingStarted(Node node) { // node is connected and will start to rewind Sysout.log(node, "Node is starting to rewind!", "sequence=", node.getNextExpectedSequence()); } @Override public void onRewinded(Node node) { // the node has finished rewinding all past messages from the current session // note that this is called only once in the lifecycle of a node, because a node rewinds only once, when its open() method is called Sysout.log(node, "Node has finished rewinding!", "sequence=", node.getNextExpectedSequence()); } @Override public void onGapFillingStarted(Node node, long seq, int count) { Sysout.log(node, "Node lost messages and is gap filling!", "sequence=", node.getNextExpectedSequence(), "seqRequested=", seq, "count=", count); } @Override public void onGapFillingEnded(Node node) { Sysout.log(node, "Node has finished gap filling!", "sequence=", node.getNextExpectedSequence()); } @Override public void onConnectionTerminated(Node node) { // the node has lost connection to the sequencer Sysout.log(node, "Node has lost connection with the sequencer!", "sequence=", node.getNextExpectedSequence()); } @Override public void onSessionStarted(Node node, String sessionName) { Sysout.log(node, "Sequencer session has started!", "sequence=", node.getNextExpectedSequence(), "session=", sessionName); } @Override public void onSessionEnded(Node node, String sessionName) { Sysout.log(node, "Sequencer session has ended!", "sequence=", node.getNextExpectedSequence(), "session=", sessionName); } @Override public void onActivated(Node node) { // when activated, the node is able to send commands to the sequencer Sysout.log(node, "Node has been activated!", "accountSeq=", node.getAccountSequence(), "sequence=", node.getNextExpectedSequence()); } @Override public void onDeactivated(Node node) { // when deactivated, the node will not send any commands to the sequencer Sysout.log(node, "Node has been deactivated!", "sequence=", node.getNextExpectedSequence()); } @Override public void onBatchProcessed(Node node, int numberOfMessagesProcessed) { // node processes messages from the event stream in batches Sysout.log(node, "Node has processed a batch of messages from the sequencer!", "sequence=", node.getNextExpectedSequence(), "messages=", numberOfMessagesProcessed); } @Override public void onAcked(Node node, Command cmd, Message msg) { // the given command was acked by the given msg Sysout.log(node, "Command acked by message!", "cmd=", cmd.toCharSequence(), "msg=", msg.toCharSequence()); } @Override public void onMessage(Node node, boolean isMine, Message msg) { Sysout.log(node, "Got message!", "now=", node.currentSequencerTime(), "isRewinding=", node.isRewinding(), "session=", msg.getSession(), "msgTimestamp=", msg.getTimestamp(), "from=", msg.getAccountName(), "isMine=", isMine, "seq=", msg.getSequence(), "dataSize=", msg.getData().remaining(), "data=", msg.getData() ); } }