In a distributed system, Nodes are responsible for executing the application logic in a decentralized/distributed way. With CoralSequencer you can easily code a node that will send commands to the sequencer and listen to messages in the event-stream (i.e. message-bus). Below we show an example of a simple node that sends a TIME
command to the sequencer and waits to see the corresponding message in the event-stream. When it receives the message, it waits 3 seconds and sends another command, repeating the process.
package com.coralblocks.coralsequencer.node; import java.nio.ByteBuffer; import com.coralblocks.coralbits.ts.TimeUnit; import com.coralblocks.coralbits.util.ByteBufferUtils; import com.coralblocks.coralreactor.nio.NioReactor; import com.coralblocks.coralreactor.util.Configuration; import com.coralblocks.coralsequencer.message.Message; import com.coralblocks.coralsequencer.mq.Node; public class SampleNode extends Node { private final static int PERIOD = 3; // 3 seconds... public SampleNode(NioReactor nio, String name, Configuration config) { super(nio, name, config); } @Override protected void handleActivated() { // this method is called when the node becomes active sendCommand(); } @Override protected void handleDeactivated() { // called when a node has been deactivated // once deactivated a node will not send commands removeEventTimeout(); // turn off event timeout if set } @Override protected void handleEventTimeout(long now, long period, TimeUnit unit) { // this method is triggered by the event timeout you are setting in the handleMessage method // Note: it is triggered only once so you must re-register the timeout if you want to do it again (it is not a loop timer) sendCommand(); } private void sendCommand() { sendCommand("TIME-" + System.currentTimeMillis()); } @Override protected void handleMessage(boolean isMine, Message msg) { if (!isMine || isRewinding()) return; // not interested, quickly ignore them... ByteBuffer data = msg.getData(); System.out.println("Saw my message in the event-stream: " + ByteBufferUtils.parseString(data)); setEventTimeout(PERIOD, TimeUnit.SECONDS); // set a trigger to send the command again after 3 seconds } }
Note that every command sent to the sequencer by a node will make the sequencer send a corresponding message to the event-stream with the sender’s account. That’s how the node makes sure that its command was received and processed by the sequencer. You don’t need to worry about that or do anything, but under the hood CoralSequencer will resend the command if it does not see the corresponding message (i.e. the ack) in the event-stream after N milliseconds. Again, this is totally transparent for the developer coding the node, as you can see in the source code above.
A node can be read-only, in other words, it will only listen to the event-stream and never send any command to the sequencer. Our node above does send a command to the sequencer (i.e. it is not read-only) using the sendCommand(String)
method. Besides that method, you can also use sendCommand(byte[])
, sendCommand(ByteBuffer)
and sendCommand(Proto)
.
The isMine
flag passed by the handleMessage(boolean, Message)
method is important as it tells you if you are receiving a message that belongs to this node or not. Recall that the sequencer always broadcasts all messages to all nodes, so you will be seeing messages from other nodes in this method. If you are only interested in your messages you can quickly drop them by checking the isMine
boolean.
Another important check is done with isRewinding()
. The first time the node connects to the sequencer it will receive a replay of all the previous messages from the current session, in a process called rewinding. You can use this messages to rebuild state if you need to. In our simple example we don’t want to do anything with past messages so we simply drop them.
Configuring the Node
Below the CoralSequencer DSL to configure a node:
# allow this node to be managed by telnet VM addAdmin telnet 51 # creates the node (account = NODE1) VM newNode NODE1 com.coralblocks.coralsequencer.node.SampleNode # the lines below can also be executed manually by admin NODE1 open NODE1 activate
Add the lines above to a file time.mq and use the script ./bin/start.sh
to execute the DSL and start the node:
Managing the Node
Because we configured the telnet admin in the DSL above, we can telnet to the admin port (i.e. 50000 + port id) to execute DSL commands on the node. For example, the last lines to open and activate the node can be executed manually through telnet, as the example below shows:
NOTE: The highlighted lines below are the commands executed
$ telnet localhost 50051 Trying ::1... Connected to localhost (::1). Escape character is '^]'. Hi! What can I do for you? You can start by typing 'list'... list NODE1 NODE1 open NODE1 close NODE1 setMessageReceiver NODE1 setCommandSender NODE1 activate NODE1 sendCommand NODE1 status NODE1-CommandSender-255.255.255.255:60010 NODE1-MessageReceiver-0.0.0.0:60066 NODE1 open NODE1 was opened! NODE1 activate true activate called!
Conclusion
Writing a Node using CoralSequencer is extremely easy. Moreover you can configure and manage your nodes using CoralSequencer’s DSL.