Inter-thread communication within CoralReactor

CoralReactor was built on purpose, from the ground up, to be single-threaded. That means that no other thread besides the reactor thread should be executing any code or accessing any data belonging to servers and clients. This not only provides super-fast performance but also allows for much simpler code that does not have to worry about thread synchronization, lock contention, race-conditions, deadlocks, thread starvation and many other pitfalls of multithreaded programming. However there are common scenarios where other threads must interact with the reactor thread. In this article, we explore how this can be achieved while preserving the single-threaded principle, avoiding synchronization or locks, and generating zero garbage.


The Scenario

Below we list the source code of a simple reactor client that increments a counter based on messages it receives from a server.

package com.coralblocks.coralreactor.client.callback;

import static com.coralblocks.corallog.Log.*;

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.AbstractLineTcpClient;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;

public class CounterClient extends AbstractLineTcpClient {

	private long total;
	
	public CounterClient(NioReactor nio, String host, int port, Configuration config) {
	    super(nio, host, port, config);
    }
	
	@Override
	protected void handleOpened() {
		total = 0;
	}
	
	@Override
	protected void handleMessage(ByteBuffer msg) {
		
		if (!msg.hasRemaining()) return;
		
		char c = (char) msg.get();
		
		if (c == '+') increment();
		if (c == '-') decrement();
	}
	
	public void increment() {
		total++;
		Info.log("Total was incremented:", total, "thread=", Thread.currentThread().getName());
	}
	
	public void decrement() {
		total--;
		Info.log("Total was decremented:", total, "thread=", Thread.currentThread().getName());
	}
	
	public static void main(String[] args) throws InterruptedException {
		
		NioReactor nio = NioReactor.create();
		
		MapConfiguration config = new MapConfiguration();
		
		final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
		client.open();
		
		nio.start();
	}
}

As you can see, when the clients gets '+' from the server the counter is incremented. When it gets '-' the counter is decremented. You can easily simulate this server using netcat, as the screenshot below shows:

Screen Shot 2015-04-23 at 5.29.21 PM

After typing some pluses and minuses on the server, you can see the following log messages in the client console:

17:28:47.517089-INFO CounterClient-localhost:45151 Client opened! sequence=1 session=null
17:28:47.566073-INFO NioReactor Reactor started! type=OptimumNioReactor impl=KQueueSelectorImpl
17:28:47.567366-INFO CounterClient-localhost:45151 Connection established!
17:28:47.567400-INFO CounterClient-localhost:45151 Connection opened!
17:28:53.740893-INFO Total was incremented: 1 thread=NioReactor
17:28:55.044887-INFO Total was decremented: 0 thread=NioReactor
17:28:56.169573-INFO Total was incremented: 1 thread=NioReactor
17:28:56.946586-INFO Total was incremented: 2 thread=NioReactor
17:28:58.161256-INFO Total was decremented: 1 thread=NioReactor
17:28:59.229615-INFO Total was decremented: 0 thread=NioReactor

So far so good. Note that we are printing on the log message the name of the thread doing the increment/decrement on the counter, in this case the NioReactor thread.

Incrementing from Another Thread

Now, suppose you need another thread, besides the reactor thread, to increment and decrement the counter. A common scenario is when actions originate from a GUI running on a separate thread. For example, when a user clicks a button, the action must be communicated to the reactor thread. Below, we modify our main method to simulate another thread interacting with the counter by incrementing and decrementing it:

	public static void main(String[] args) throws InterruptedException {
		
		NioReactor nio = NioReactor.create();
		
		MapConfiguration config = new MapConfiguration();
		
		final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
		client.open();
		
		nio.start();
		
		for(int i = 0; i < 100; i++) {
			
			Thread.sleep(2000);
			
			final boolean check = (i % 3 == 0);
			
			if (check) client.decrement();
			else client.increment();
		}
	}

When we run this client, type some pluses and minuses on the server, we get the following log output:

17:45:38.069983-INFO CounterClient-localhost:45151 Client opened! sequence=1 session=null
17:45:38.126212-INFO NioReactor Reactor started! type=OptimumNioReactor impl=KQueueSelectorImpl
17:45:38.127543-INFO CounterClient-localhost:45151 Connection established!
17:45:38.127578-INFO CounterClient-localhost:45151 Connection opened!
17:45:39.686658-INFO Total was incremented: 1 thread=NioReactor
17:45:40.127196-INFO Total was decremented: 0 thread=main
17:45:41.035442-INFO Total was decremented: -1 thread=NioReactor
17:45:42.128232-INFO Total was incremented: 0 thread=main
17:45:42.408007-INFO Total was decremented: -1 thread=NioReactor

Note that we have just broken the single-threaded principle of CoralReactor and we now have two threads (main and NioReactor) calling the same piece of code and accessing the same variable. From a multithreading point of view, one might be tempted to fix this problem using the synchronized keyword, as below:

	public synchronized void increment() {
		total++;
		Info.log("Total was incremented:", total, "thread=", Thread.currentThread().getName());
	}
	
	public synchronized void decrement() {
		total--;
		Info.log("Total was decremented:", total, "thread=", Thread.currentThread().getName());
	}

Please don’t. That will introduce lock-contention on the critical reactor thread and attest your departure from the single-threaded design principle. Fortunately CoralReactor can easily restore the single-threaded principle through the use of callbacks.


Synchronous (blocking) Callbacks

Instead of having the external thread calling code from the reactor thread, notify the reactor thread that some code execution is pending by passing it a CallbackListener. The reactor thread will then call you back by executing the provided callback listener. See the example below:

public static class CounterCallback implements CallbackListener {

	private final CounterClient client;
	public volatile boolean check;

	public CounterCallback(CounterClient client) {
		this.client = client;
	}

	@Override
	public void onCallback(long now) {
		// this will be executed by the reactor thread
		if (check) client.decrement();
		else client.increment();
	}
}

public static void main(String[] args) throws InterruptedException {
     
    NioReactor nio = NioReactor.create();
     
    MapConfiguration config = new MapConfiguration();
     
    final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
    client.open();
     
    nio.start();

    CounterCallback callback = new CounterCallback(client); // re-using the same instance
     
    for(int i = 0; i < 100; i++) {
         
        Thread.sleep(2000);
         
        callback.check = (i % 3 == 0);
         
        nio.call(callback); // synchronous call => will block until reactor executes the callback
    }
}

Now when we run our client we don’t see the main thread executing the client code anymore, just the reactor thread. We have successfully restored the single-threaded design principle. Also note that we are not generating any garbage because we are re-using the same instance of our CounterCallback object over and over again. That’s possible because our nio.call method is synchronous, in other words, it will block the main thread until the reactor thread is able to execute the callback. As we’ll see below, a garbage-free approach for an asynchronous (non-blocking) call is a bit trickier.

Asynchronous (non-blocking) Callbacks

You can also push callback listeners to the reactor thread without having to wait for its execution, in other words, you can use the method nio.callAsync to push the callback listener to the reactor thread and return immediately to the main thread. Internally, the callback listener goes to a concurrent queue and is executed by the reactor thread as soon as possible, in other words, it is executed asynchronously by the reactor thread. See the example below:

public static void main(String[] args) throws InterruptedException {
     
    NioReactor nio = NioReactor.create();
     
    MapConfiguration config = new MapConfiguration();
     
    final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
    client.open();
     
    nio.start();

    for(int i = 0; i < 100; i++) {
         
        Thread.sleep(2000);

        CounterCallback callback = new CounterCallback(client); // garbage created here
         
        callback.check = (i % 3 == 0);
         
        nio.callAsync(callback); // asynchronous call => will return immediately
    }
}

Now, because we don’t block anymore, we must pass a different instance of our callback listener each time we call nio.callAsync. This will create garbage for the GC as these instances will later be discarded by the reactor thread after they get executed. To fix this garbage leak, we can use an internal garbage-free queue offered by CoralReactor.

Asynchronous Callbacks without Garbage

CoralReactor provides a lock-free, ultra-fast, and garbage-free internal queue for handling callbacks. You can create multiple underlying queues, each dedicated to a specific callback listener. This approach eliminates garbage generation and minimizes inter-thread communication latency. Below an example:

public static void main(String[] args) throws InterruptedException {
     
    NioReactor nio = NioReactor.create();
     
    MapConfiguration config = new MapConfiguration();
     
    final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
    client.open();

    Builder<CounterCallback> builder = new Builder<CounterCallback>() {
        @Override
        public CounterCallback newInstance() {
            return new CounterCallback(client);
        }
    };

    nio.initCallbackQueue(CounterCallback.class, builder); // init internal queue

    nio.start(); // only start reactor after initializing queues

    for(int i = 0; i < 100; i++) {
         
        Thread.sleep(2000);

		CounterCallback callback = nio.nextCallback(CounterCallback.class); // get from queue
         
        callback.check = (i % 3 == 0);
         
        nio.flushCallbacks(CounterCallback.class); // flush to consumer
    }
}

Note that you must initialize the queue before the reactor is started. Then all you have to do is use your callback listener class to get a callback object and dispatch it to the reactor thread for execution. For more information about lock-free and garbage-free queues for inter-thread communication, you can check our open-source CoralQueue project.


Conclusion

CoralReactor was designed from the ground up to be strictly single-threaded. This means multiple threads must never share state with the reactor thread, as doing so would lead to unpredictable errors caused by race conditions. When inter-thread communication is necessary, callbacks must be used to ensure that only the reactor thread executes the relevant code. CoralReactor supports both synchronous (blocking) and asynchronous (non-blocking) callbacks from the reactor thread, without generating any garbage.