SimSequencer is a framework that lets you simulate through code the majority of the aspects of the sequencer architecture without any networking involved. It can be very useful for prototyping, testing and learning purposes. With SimSequencer you can code your nodes to test all the moving parts of your application as they were running and interacting in a real distributed system. And it has the same API of CoralSequencer.
Some of the main premises of the sequencer architecture are:
- All nodes receive all messages in the exact same order, always
- Nodes become deterministic finite-state machines
- Clusters for high-availability and failover become trivial
Below we demonstrate these premises through working SimSequencer code:
All nodes receive all messages in the exact same order, always
Sequencer sequencer = new PassThroughSequencer("SEQ"); final List<Message> messages1 = new LinkedList<Message>(); final List<Message> messages2 = new LinkedList<Message>(); Node node1 = new Node("NODE1") { @Override protected void handleMessage(boolean isMine, Message msg) { messages1.add(msg); } }; Node node2 = new Node("NODE2") { @Override protected void handleMessage(boolean isMine, Message msg) { messages2.add(msg); } }; sequencer.addNode(node1, node2); sequencer.open().activate(); node1.open().activate(); node2.open().activate(); Random rand = new Random(); final int messagesToSend = 200; for(int i = 0; i < messagesToSend; i++) { Node n = rand.nextBoolean() ? node1 : node2; n.sendCommand("Hi" + rand.nextInt(1000)); } Assert.assertEquals(messagesToSend, messages1.size()); Assert.assertEquals(messagesToSend, messages2.size()); Iterator<Message> iter1 = messages1.iterator(); Iterator<Message> iter2 = messages2.iterator(); while(iter1.hasNext() && iter2.hasNext()) { Message m1 = iter1.next(); Message m2 = iter2.next(); Assert.assertTrue(m1.isEqualTo(m2)); } node1.close(); node2.close(); sequencer.close();
Nodes become deterministic finite-state machines
import static com.coralblocks.simsequencer.util.Log.*; import com.coralblocks.simsequencer.Message; import com.coralblocks.simsequencer.Node; public class CounterNode extends Node { private long counter; public CounterNode(String name) { super(name); } @Override protected void handleOpened() { this.counter = 1; Info.log(name, "Counter was reset to 1"); } public long getCounter() { return counter; } @Override protected void handleMessage(boolean isMine, Message msg) { String[] tokens = msg.getDataAsString().split("\\|"); String type = tokens[0]; if (type.equals("ADD_DETERMINISTIC_TIMESTAMP")) { counter += currentSequencerTime(); } else if (type.equals("ADD_VALUE")) { counter += Integer.parseInt(tokens[1]); } } }
Sequencer sequencer = new PassThroughSequencer("SEQ"); CounterNode node = new CounterNode("NODE1"); sequencer.addNode(node); sequencer.open().activate(); node.open().activate(); Random rand = new Random(); final int messagesToSend = 200; for(int i = 0; i < messagesToSend; i++) { int type = rand.nextInt(2); if (type == 0) { node.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); } else if (type == 1) { node.sendCommand("ADD_VALUE|" + rand.nextInt(300000)); } } final long counter = node.getCounter(); for(int i = 0; i < 5; i++) { node.close(); node.open(); // resets the counter to 1... // node then rewinds, receiving all messages again... Assert.assertEquals(counter, node.getCounter()); } node.close(); sequencer.close();
Clusters for high-availability and failover become trivial
// Hot-Warm (Active-Passive) Cluster Sequencer sequencer = new PassThroughSequencer("SEQ"); // Two nodes with the same account "NODE1" for a cluster CounterNode nodeA = new CounterNode("NODE1"); CounterNode nodeB = new CounterNode("NODE1"); sequencer.addNode(nodeA, nodeB); sequencer.open().activate(); nodeA.open().activate(); // hot (active) nodeB.open(); // warm (passive) Random rand = new Random(); final int messagesToSend = 200; for(int i = 0; i < messagesToSend; i++) { int type = rand.nextInt(2); if (type == 0) { nodeA.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); } else if (type == 1) { nodeA.sendCommand("ADD_VALUE|" + rand.nextInt(300000)); } } Assert.assertEquals(nodeA.getCounter(), nodeB.getCounter()); // fail over to the warm node nodeA.deactivate(); // now it is warm nodeB.activate(); // now it is hot for(int i = 0; i < messagesToSend; i++) { int type = rand.nextInt(2); if (type == 0) { nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); } else if (type == 1) { nodeB.sendCommand("ADD_VALUE|" + rand.nextInt(300000)); } } Assert.assertEquals(nodeA.getCounter(), nodeB.getCounter()); nodeA.close(); nodeB.close(); sequencer.close();
// Hot-Hot (Active-Active) Cluster Sequencer sequencer = new PassThroughSequencer("SEQ"); // Two nodes with the same account "NODE1" for a cluster CounterNode nodeA = new CounterNode("NODE1"); CounterNode nodeB = new CounterNode("NODE1"); sequencer.addNode(nodeA, nodeB); sequencer.open().activate(); nodeA.open().activate(); // hot (active) nodeB.open().activate(); // hot (active) Random rand = new Random(); final int messagesToSend = 200; for(int i = 0; i < messagesToSend; i++) { int type = rand.nextInt(2); if (type == 0) { if (rand.nextBoolean()) { // order does not matter nodeA.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); } else { nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); nodeA.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); } } else if (type == 1) { int value = rand.nextInt(300000); if (rand.nextBoolean()) { // order does not matter nodeA.sendCommand("ADD_VALUE|" + value); nodeB.sendCommand("ADD_VALUE|" + value); } else { nodeB.sendCommand("ADD_VALUE|" + value); nodeA.sendCommand("ADD_VALUE|" + value); } } } Assert.assertEquals(nodeA.getCounter(), nodeB.getCounter()); // pull the plug from one of the nodes nodeA.close(); // now it is dead nodeA = null; for(int i = 0; i < messagesToSend; i++) { int type = rand.nextInt(2); if (type == 0) { nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP"); } else if (type == 1) { nodeB.sendCommand("ADD_VALUE|" + rand.nextInt(300000)); } } // bring another node to the cluster... CounterNode nodeC = new CounterNode("NODE1"); sequencer.addNode(nodeC); nodeC.open().activate(); // hot (active) Assert.assertEquals(nodeB.getCounter(), nodeC.getCounter()); nodeB.close(); nodeC.close(); sequencer.close();