CoralSequencer (Straight to the Point)

Below an example of how you can implement a CoralSequencer node using inheritance and composition so that you can send and receive messages to and from the sequencer. Messages sent to the sequencer will appear in the event stream (i.e. message bus).

Inheritance:

package com.coralblocks.coralsequencer.node;

import static com.coralblocks.corallog.Log.*;

import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;

// Here we use INHERITANCE to extend a Node. For COMPOSITION see the other example further below...
public class SimpleNode extends Node {
	
	public SimpleNode(NioReactor nio, String name, Configuration config) {
	    super(nio, name, config);
    }
	
	@Override
	protected void handleOpened() {
		// when a node is opened, it will try to connect to the sequencer, to receive messages from the event stream
		Sysout.log(name, "Node has been opened!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleClosed() {
		// when a node is closed, it will disconnect from the sequencer, and it will not receive any more messages from the event stream
		Sysout.log(name, "Node has been closed!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleConnectionEstablished() {
		// the node was able to establish a connection to the sequencer
		Sysout.log(name, "Node has established a connection with the sequencer!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleConnectionOpened() {
		// the node has started to receive *live* messages from the sequencer (after rewinding or after a disconnect followed by a gap fill)
		Sysout.log(name, "Node is receiving live messages from the sequencer!", "sequence=", getNextExpectedSequence());
	}

	@Override
	protected void handleRewindingStarted() {
		// node is connected and will start to rewind
		Sysout.log(name, "Node is starting to rewind!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleRewinded() {
		// the node has finished rewinding all past messages from the current session
		// note that this is called only once in the lifecycle of a node, because a node rewinds only once, when its open() method is called
		Sysout.log(name, "Node has finished rewinding!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleGapFillingStarted(long seq, int count) {
		Sysout.log(name, "Node lost messages and is gap filling!", "sequence=", getNextExpectedSequence(), "seqRequested=", seq, "count=", count);
	}
	
	@Override
	protected void handleGapFillingEnded() {
		Sysout.log(name, "Node has finished gap filling!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleConnectionTerminated() {
		// the node has lost connection to the sequencer
		Sysout.log(name, "Node has lost connection with the sequencer!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleSessionStarted(String sessionName) {
		Sysout.log(name, "Sequencer session has started!", "sequence=", getNextExpectedSequence(), "session=", sessionName);
	}
	
	@Override
	protected void handleSessionEnded(String sessionName) {
		Sysout.log(name, "Sequencer session has ended!", "sequence=", getNextExpectedSequence(), "session=", sessionName);
	}
	
	@Override
	protected void handleActivated() {
		// when activated, the node is able to send commands to the sequencer
		Sysout.log(name, "Node has been activated!", "accountSeq=", getAccountSequence(), "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleDeactivated() {
		// when deactivated, the node will not send any commands to the sequencer
		Sysout.log(name, "Node has been deactivated!", "sequence=", getNextExpectedSequence());
	}
	
	@Override
	protected void handleBatchProcessed(int numberOfMessagesProcessed) {
		// node processes messages from the event stream in batches
		Sysout.log(name, "Node has processed a batch of messages from the sequencer!", "sequence=", getNextExpectedSequence(), "messages=", numberOfMessagesProcessed);
	}

    @Override
    public void handleAcked(Command cmd, Message msg) {
         // the given command was acked by the given msg
        Sysout.log(name, "Command acked by message!", "cmd=", cmd.toCharSequence(), "msg=", msg.toCharSequence());
         
    }
	
	@Override
    protected void handleMessage(boolean isMine, Message msg) {

		Sysout.log(name, "Got message!",
				"now=", currentSequencerTime(),
				"isRewinding=", isRewinding(),
				"session=", msg.getSession(),
				"msgTimestamp=", msg.getTimestamp(),
				"from=", msg.getAccountName(),
				"isMine=", isMine,
				"seq=", msg.getSequence(), 
				"dataSize=", msg.getData().remaining(),
				"data=", msg.getData()
			);
    
	}
	
}

Composition:

package com.coralblocks.coralsequencer.node;

import static com.coralblocks.corallog.Log.*;

import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.VM;
import com.coralblocks.coralsequencer.command.Command;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;
import com.coralblocks.coralsequencer.mq.NodeComposition;
import com.coralblocks.coralsequencer.mq.NodeListener;

public class SimpleNodeComposition implements NodeListener, NodeComposition {
	
	private final Node node;
	
	// You can also define the standard constructor below so you can start this node from a mq file if you want to...
	// In order to do that you must also make the class implement NodeComposition...
	public SimpleNodeComposition(NioReactor nio, String name, Configuration config) {
		this.node = new Node(nio, name, config);
		this.node.addListener(this);
    }
	
	public SimpleNodeComposition(String name) {
		this.node = VM.getInstance().newNode(name, "noRewind=true");
		this.node.addListener(this);
    }
	
	@Override
	public Node getNode() {
		return node;
	}
	
	public static void main(String[] args) {
		
		int telnetIndex = -1;
		if (args.length > 0) telnetIndex = Integer.parseInt(args[0]);
		
		VM vm = VM.getInstance();
		if (telnetIndex != -1) vm.setTelnetIndex(telnetIndex);
		vm.start();
		
		SimpleNodeComposition simple = new SimpleNodeComposition("NODE6");
		simple.getNode().open();
		simple.getNode().activate();
	}
	
	@Override
	public void onOpened(Node node) {
		// when a node is opened, it will try to connect to the sequencer, to receive messages from the event stream
		Sysout.log(node, "Node has been opened!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onClosed(Node node) {
		// when a node is closed, it will disconnect from the sequencer, and it will not receive any more messages from the event stream
		Sysout.log(node, "Node has been closed!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onConnectionEstablished(Node node) {
		// the node was able to establish a connection to the sequencer
		Sysout.log(node, "Node has established a connection with the sequencer!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onConnectionOpened(Node node) {
		// the node has started to receive *live* messages from the sequencer (after rewinding or after a disconnect followed by a gap fill)
		Sysout.log(node, "Node is receiving live messages from the sequencer!", "sequence=", node.getNextExpectedSequence());
	}

	@Override
	public void onRewindingStarted(Node node) {
		// node is connected and will start to rewind
		Sysout.log(node, "Node is starting to rewind!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onRewinded(Node node) {
		// the node has finished rewinding all past messages from the current session
		// note that this is called only once in the lifecycle of a node, because a node rewinds only once, when its open() method is called
		Sysout.log(node, "Node has finished rewinding!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onGapFillingStarted(Node node, long seq, int count) {
		Sysout.log(node, "Node lost messages and is gap filling!", "sequence=", node.getNextExpectedSequence(), "seqRequested=", seq, "count=", count);
	}
	
	@Override
	public void onGapFillingEnded(Node node) {
		Sysout.log(node, "Node has finished gap filling!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onConnectionTerminated(Node node) {
		// the node has lost connection to the sequencer
		Sysout.log(node, "Node has lost connection with the sequencer!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onSessionStarted(Node node, String sessionName) {
		Sysout.log(node, "Sequencer session has started!", "sequence=", node.getNextExpectedSequence(), "session=", sessionName);
	}
	
	@Override
	public void onSessionEnded(Node node, String sessionName) {
		Sysout.log(node, "Sequencer session has ended!", "sequence=", node.getNextExpectedSequence(), "session=", sessionName);
	}
	
	@Override
	public void onActivated(Node node) {
		// when activated, the node is able to send commands to the sequencer
		Sysout.log(node, "Node has been activated!", "accountSeq=", node.getAccountSequence(), "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onDeactivated(Node node) {
		// when deactivated, the node will not send any commands to the sequencer
		Sysout.log(node, "Node has been deactivated!", "sequence=", node.getNextExpectedSequence());
	}
	
	@Override
	public void onBatchProcessed(Node node, int numberOfMessagesProcessed) {
		// node processes messages from the event stream in batches
		Sysout.log(node, "Node has processed a batch of messages from the sequencer!", "sequence=", node.getNextExpectedSequence(), "messages=", numberOfMessagesProcessed);
	}
	
	@Override
	public void onAcked(Node node, Command cmd, Message msg) {
		// the given command was acked by the given msg
		Sysout.log(node, "Command acked by message!", "cmd=", cmd.toCharSequence(), "msg=", msg.toCharSequence());
		
	}
	
	@Override
    public void onMessage(Node node, boolean isMine, Message msg) {

		Sysout.log(node, "Got message!",
				"now=", node.currentSequencerTime(),
				"isRewinding=", node.isRewinding(),
				"session=", msg.getSession(),
				"msgTimestamp=", msg.getTimestamp(),
				"from=", msg.getAccountName(),
				"isMine=", isMine,
				"seq=", msg.getSequence(), 
				"dataSize=", msg.getData().remaining(),
				"data=", msg.getData()
			);
	}
	
}