Nodes (CoralSequencer article series)

In a distributed system, Nodes are responsible for executing the application logic in a decentralized/distributed way. With CoralSequencer you can easily code a node that will send commands to the sequencer and listen to messages in the event-stream (i.e. message-bus). Below we show an example of a simple node that sends a TIME command to the sequencer and waits to see the corresponding message in the event-stream. When it receives the message, it waits 3 seconds and sends another command, repeating the process.

package com.coralblocks.coralsequencer.node;

import java.nio.ByteBuffer;

import com.coralblocks.coralbits.ts.TimeUnit;
import com.coralblocks.coralbits.util.ByteBufferUtils;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;

public class SampleNode extends Node {
	
	private final static int PERIOD = 3; // 3 seconds...
	
	public SampleNode(NioReactor nio, String name, Configuration config) {
	    super(nio, name, config);
    }
	
	@Override
	protected void handleActivated() {
		// this method is called when the node becomes active
		sendCommand();
	}
	
	@Override
	protected void handleDeactivated() {
		// called when a node has been deactivated
		// once deactivated a node will not send commands
		removeEventTimeout(); // turn off event timeout if set
	}
	
	@Override
	protected void handleEventTimeout(long now, long period, TimeUnit unit) {
		// this method is triggered by the event timeout you are setting in the handleMessage method
		// Note: it is triggered only once so you must re-register the timeout if you want to do it again (it is not a loop timer)
		sendCommand();
	}
	
	private void sendCommand() {
		sendCommand("TIME-" + System.currentTimeMillis());
	}

	@Override
    protected void handleMessage(boolean isMine, Message msg) {
		
		if (!isMine || isRewinding()) return; // not interested, quickly ignore them...
		
		ByteBuffer data = msg.getData();
		
		System.out.println("Saw my message in the event-stream: " + ByteBufferUtils.parseString(data));
		
		setEventTimeout(PERIOD, TimeUnit.SECONDS); // set a trigger to send the command again after 3 seconds
    }
}

Note that every command sent to the sequencer by a node will make the sequencer send a corresponding message to the event-stream with the sender’s account. That’s how the node makes sure that its command was received and processed by the sequencer. You don’t need to worry about that or do anything, but under the hood CoralSequencer will resend the command if it does not see the corresponding message (i.e. the ack) in the event-stream after N milliseconds. Again, this is totally transparent for the developer coding the node, as you can see in the source code above.

A node can be read-only, in other words, it will only listen to the event-stream and never send any command to the sequencer. Our node above does send a command to the sequencer (i.e. it is not read-only) using the sendCommand(String) method. Besides that method, you can also use sendCommand(byte[]), sendCommand(ByteBuffer) and sendCommand(Proto).

The isMine flag passed by the handleMessage(boolean, Message) method is important as it tells you if you are receiving a message that belongs to this node or not. Recall that the sequencer always broadcasts all messages to all nodes, so you will be seeing messages from other nodes in this method. If you are only interested in your messages you can quickly drop them by checking the isMine boolean.

Another important check is done with isRewinding(). The first time the node connects to the sequencer it will receive a replay of all the previous messages from the current session, in a process called rewinding. You can use this messages to rebuild state if you need to. In our simple example we don’t want to do anything with past messages so we simply drop them.


Configuring the Node

Below the CoralSequencer DSL to configure a node:

# allow this node to be managed by telnet
VM addAdmin telnet 51

# creates the node (account = NODE1)
VM newNode NODE1 com.coralblocks.coralsequencer.node.SampleNode

# the lines below can also be executed manually by admin
NODE1 open
NODE1 activate

Add the lines above to a file time.mq and use the script ./bin/start.sh to execute the DSL and start the node:

Screen Shot 2016-02-23 at 4.52.56 PM

Managing the Node

Because we configured the telnet admin in the DSL above, we can telnet to the admin port (i.e. 50000 + port id) to execute DSL commands on the node. For example, the last lines to open and activate the node can be executed manually through telnet, as the example below shows:

NOTE: The highlighted lines below are the commands executed

$ telnet localhost 50051
Trying ::1...
Connected to localhost (::1).
Escape character is '^]'.

Hi! What can I do for you? You can start by typing 'list'...

list NODE1

NODE1 open
NODE1 close
NODE1 setMessageReceiver
NODE1 setCommandSender
NODE1 activate
NODE1 sendCommand
NODE1 status

NODE1-CommandSender-255.255.255.255:60010
NODE1-MessageReceiver-0.0.0.0:60066

NODE1 open

NODE1 was opened!

NODE1 activate true

activate called!


Conclusion

Writing a Node using CoralSequencer is extremely easy. Moreover you can configure and manage your nodes using CoralSequencer’s DSL.