Below an example of how you can implement a CoralSequencer node using inheritance and composition so that you can send and receive messages to and from the sequencer. Messages sent to the sequencer will appear in the event stream (i.e. message bus).
Inheritance:
package com.coralblocks.coralsequencer.node;
import static com.coralblocks.corallog.Log.*;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;
// Here we use INHERITANCE to extend a Node. For COMPOSITION see the other example further below...
public class SimpleNode extends Node {
public SimpleNode(NioReactor nio, String name, Configuration config) {
super(nio, name, config);
}
@Override
protected void handleOpened() {
// when a node is opened, it will try to connect to the sequencer, to receive messages from the event stream
Sysout.log(name, "Node has been opened!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleClosed() {
// when a node is closed, it will disconnect from the sequencer, and it will not receive any more messages from the event stream
Sysout.log(name, "Node has been closed!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleConnectionEstablished() {
// the node was able to establish a connection to the sequencer
Sysout.log(name, "Node has established a connection with the sequencer!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleConnectionOpened() {
// the node has started to receive *live* messages from the sequencer (after rewinding or after a disconnect followed by a gap fill)
Sysout.log(name, "Node is receiving live messages from the sequencer!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleRewindingStarted() {
// node is connected and will start to rewind
Sysout.log(name, "Node is starting to rewind!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleRewinded() {
// the node has finished rewinding all past messages from the current session
// note that this is called only once in the lifecycle of a node, because a node rewinds only once, when its open() method is called
Sysout.log(name, "Node has finished rewinding!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleGapFillingStarted(long seq, int count) {
Sysout.log(name, "Node lost messages and is gap filling!", "sequence=", getNextExpectedSequence(), "seqRequested=", seq, "count=", count);
}
@Override
protected void handleGapFillingEnded() {
Sysout.log(name, "Node has finished gap filling!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleConnectionTerminated() {
// the node has lost connection to the sequencer
Sysout.log(name, "Node has lost connection with the sequencer!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleSessionStarted(String sessionName) {
Sysout.log(name, "Sequencer session has started!", "sequence=", getNextExpectedSequence(), "session=", sessionName);
}
@Override
protected void handleSessionEnded(String sessionName) {
Sysout.log(name, "Sequencer session has ended!", "sequence=", getNextExpectedSequence(), "session=", sessionName);
}
@Override
protected void handleActivated() {
// when activated, the node is able to send commands to the sequencer
Sysout.log(name, "Node has been activated!", "accountSeq=", getAccountSequence(), "sequence=", getNextExpectedSequence());
}
@Override
protected void handleDeactivated() {
// when deactivated, the node will not send any commands to the sequencer
Sysout.log(name, "Node has been deactivated!", "sequence=", getNextExpectedSequence());
}
@Override
protected void handleBatchProcessed(int numberOfMessagesProcessed) {
// node processes messages from the event stream in batches
Sysout.log(name, "Node has processed a batch of messages from the sequencer!", "sequence=", getNextExpectedSequence(), "messages=", numberOfMessagesProcessed);
}
@Override
public void handleAcked(Command cmd, Message msg) {
// the given command was acked by the given msg
Sysout.log(name, "Command acked by message!", "cmd=", cmd.toCharSequence(), "msg=", msg.toCharSequence());
}
@Override
protected void handleMessage(boolean isMine, Message msg) {
Sysout.log(name, "Got message!",
"now=", currentSequencerTime(),
"isRewinding=", isRewinding(),
"session=", msg.getSession(),
"msgTimestamp=", msg.getTimestamp(),
"from=", msg.getAccountName(),
"isMine=", isMine,
"seq=", msg.getSequence(),
"dataSize=", msg.getData().remaining(),
"data=", msg.getData()
);
}
}
Composition:
package com.coralblocks.coralsequencer.node;
import static com.coralblocks.corallog.Log.*;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.VM;
import com.coralblocks.coralsequencer.command.Command;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;
import com.coralblocks.coralsequencer.mq.NodeComposition;
import com.coralblocks.coralsequencer.mq.NodeListener;
public class SimpleNodeComposition implements NodeListener, NodeComposition {
private final Node node;
// You can also define the standard constructor below so you can start this node from a mq file if you want to...
// In order to do that you must also make the class implement NodeComposition...
public SimpleNodeComposition(NioReactor nio, String name, Configuration config) {
this.node = new Node(nio, name, config);
this.node.addListener(this);
}
public SimpleNodeComposition(String name) {
this.node = VM.getInstance().newNode(name, "noRewind=true");
this.node.addListener(this);
}
@Override
public Node getNode() {
return node;
}
public static void main(String[] args) {
int telnetIndex = -1;
if (args.length > 0) telnetIndex = Integer.parseInt(args[0]);
VM vm = VM.getInstance();
if (telnetIndex != -1) vm.setTelnetIndex(telnetIndex);
vm.start();
SimpleNodeComposition simple = new SimpleNodeComposition("NODE6");
simple.getNode().open();
simple.getNode().activate();
}
@Override
public void onOpened(Node node) {
// when a node is opened, it will try to connect to the sequencer, to receive messages from the event stream
Sysout.log(node, "Node has been opened!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onClosed(Node node) {
// when a node is closed, it will disconnect from the sequencer, and it will not receive any more messages from the event stream
Sysout.log(node, "Node has been closed!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onConnectionEstablished(Node node) {
// the node was able to establish a connection to the sequencer
Sysout.log(node, "Node has established a connection with the sequencer!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onConnectionOpened(Node node) {
// the node has started to receive *live* messages from the sequencer (after rewinding or after a disconnect followed by a gap fill)
Sysout.log(node, "Node is receiving live messages from the sequencer!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onRewindingStarted(Node node) {
// node is connected and will start to rewind
Sysout.log(node, "Node is starting to rewind!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onRewinded(Node node) {
// the node has finished rewinding all past messages from the current session
// note that this is called only once in the lifecycle of a node, because a node rewinds only once, when its open() method is called
Sysout.log(node, "Node has finished rewinding!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onGapFillingStarted(Node node, long seq, int count) {
Sysout.log(node, "Node lost messages and is gap filling!", "sequence=", node.getNextExpectedSequence(), "seqRequested=", seq, "count=", count);
}
@Override
public void onGapFillingEnded(Node node) {
Sysout.log(node, "Node has finished gap filling!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onConnectionTerminated(Node node) {
// the node has lost connection to the sequencer
Sysout.log(node, "Node has lost connection with the sequencer!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onSessionStarted(Node node, String sessionName) {
Sysout.log(node, "Sequencer session has started!", "sequence=", node.getNextExpectedSequence(), "session=", sessionName);
}
@Override
public void onSessionEnded(Node node, String sessionName) {
Sysout.log(node, "Sequencer session has ended!", "sequence=", node.getNextExpectedSequence(), "session=", sessionName);
}
@Override
public void onActivated(Node node) {
// when activated, the node is able to send commands to the sequencer
Sysout.log(node, "Node has been activated!", "accountSeq=", node.getAccountSequence(), "sequence=", node.getNextExpectedSequence());
}
@Override
public void onDeactivated(Node node) {
// when deactivated, the node will not send any commands to the sequencer
Sysout.log(node, "Node has been deactivated!", "sequence=", node.getNextExpectedSequence());
}
@Override
public void onBatchProcessed(Node node, int numberOfMessagesProcessed) {
// node processes messages from the event stream in batches
Sysout.log(node, "Node has processed a batch of messages from the sequencer!", "sequence=", node.getNextExpectedSequence(), "messages=", numberOfMessagesProcessed);
}
@Override
public void onAcked(Node node, Command cmd, Message msg) {
// the given command was acked by the given msg
Sysout.log(node, "Command acked by message!", "cmd=", cmd.toCharSequence(), "msg=", msg.toCharSequence());
}
@Override
public void onMessage(Node node, boolean isMine, Message msg) {
Sysout.log(node, "Got message!",
"now=", node.currentSequencerTime(),
"isRewinding=", node.isRewinding(),
"session=", msg.getSession(),
"msgTimestamp=", msg.getTimestamp(),
"from=", msg.getAccountName(),
"isMine=", isMine,
"seq=", msg.getSequence(),
"dataSize=", msg.getData().remaining(),
"data=", msg.getData()
);
}
}