In this article we present the DiamondQueue, which is nothing more than thread A sending through a demultiplexer a bunch of requests to a fixed set of worker threads, then this set of worker threads sending the results to thread B through a multiplexer. It is important to note that thread A and thread B can be the same thread. The DiamondQueue also supports lanes to enforce message order when needed.
Below a simple example of how to use the DiamondQueue:
package com.coralblocks.coralqueue.test;
import java.util.Random;
import com.coralblocks.coralbits.util.ThreadUtils;
import com.coralblocks.coralqueue.diamond.AtomicDiamondQueue;
import com.coralblocks.coralqueue.diamond.DiamondQueue;
import com.coralblocks.coralqueue.diamond.Input;
import com.coralblocks.coralqueue.diamond.Output;
import com.coralblocks.coralqueue.diamond.Worker;
public class TestDiamond {
private static final Random RAND = new Random();
public static class AddWorker extends Worker {
public int x;
public int y;
public int result;
@Override
public boolean execute() {
ThreadUtils.sleep(RAND.nextInt(10));
this.result = x + y;
return true; // successful!
}
}
public static void main(String[] args) {
DiamondQueue<AddWorker> diamond = new AtomicDiamondQueue<AddWorker>(
512 /* capacity per worker thread lane */,
AddWorker.class, /* the worker class being used */
2 /* number of worker threads */,
new int[] { 1, 2 } /* proc_ids to bind worker threads */ );
Input<AddWorker> input = diamond.getInput();
Output<AddWorker> output = diamond.getOutput();
// start all worker threads...
diamond.start(false); // false = non-daemon thread...
final int TOTAL_NUMBER_OF_MSG_TO_SEND = 8;
// FIRST SCENARIO: No order
for(int i = 0; i < TOTAL_NUMBER_OF_MSG_TO_SEND; i++) {
AddWorker aw = null;
final int lane = -1; // choose a random free lane...
while((aw = input.nextToDispatch(lane)) == null); // busy spin wait strategy...
aw.x = RAND.nextInt(50);
aw.y = RAND.nextInt(50);
System.out.println("===> Sending: lane=" + lane + " x=" + aw.x + " y=" + aw.y);
}
input.flush(true); // send the messages down the diamond...
int messagesReceived = 0;
while(messagesReceived < TOTAL_NUMBER_OF_MSG_TO_SEND) {
long avail = output.availableToPoll();
if (avail > 0) {
for(long x = 0; x < avail; x++) {
AddWorker aw = output.poll();
if (aw.wasSuccessful()) {
System.out.println("===> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.result);
} else {
System.out.println("XXXX> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.getException());
if (aw.getException() != null) aw.getException().printStackTrace();
}
messagesReceived++;
}
output.donePolling(true);
} else {
// busy spin wait strategy...
}
}
System.out.println();
// SECOND SCENARIO: Ordered
for(int i = 0; i < TOTAL_NUMBER_OF_MSG_TO_SEND; i++) {
AddWorker aw = null;
// lanes guarantee order... (same lane => same order)
final int lane = (i % diamond.getNumberOfWorkerThreads());
while((aw = input.nextToDispatch(lane)) == null); // busy spin wait strategy...
aw.x = RAND.nextInt(50);
aw.y = RAND.nextInt(50);
System.out.println("===> Sending: lane=" + lane + " x=" + aw.x + " y=" + aw.y);
}
input.flush(true); // send the messages down the diamond...
messagesReceived = 0;
while(messagesReceived < TOTAL_NUMBER_OF_MSG_TO_SEND) {
long avail = output.availableToPoll();
if (avail > 0) {
for(long x = 0; x < avail; x++) {
AddWorker aw = output.poll();
if (aw.wasSuccessful()) {
System.out.println("===> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.result);
} else {
System.out.println("XXXX> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.getException());
if (aw.getException() != null) aw.getException().printStackTrace();
}
messagesReceived++;
}
output.donePolling(true);
} else {
// busy spin wait strategy...
}
}
diamond.stop(); // stop all worker threads...
}
}
Output:
===> Sending: lane=-1 x=18 y=30 ===> Sending: lane=-1 x=49 y=5 ===> Sending: lane=-1 x=47 y=15 ===> Sending: lane=-1 x=35 y=38 ===> Sending: lane=-1 x=26 y=6 ===> Sending: lane=-1 x=17 y=34 ===> Sending: lane=-1 x=15 y=8 ===> Sending: lane=-1 x=22 y=10 ===> Receiving: x=49 y=5 result=54 ===> Receiving: x=35 y=38 result=73 ===> Receiving: x=17 y=34 result=51 ===> Receiving: x=22 y=10 result=32 ===> Receiving: x=18 y=30 result=48 ===> Receiving: x=47 y=15 result=62 ===> Receiving: x=26 y=6 result=32 ===> Receiving: x=15 y=8 result=23 ===> Sending: lane=0 x=43 y=44 ===> Sending: lane=1 x=8 y=19 ===> Sending: lane=0 x=14 y=25 ===> Sending: lane=1 x=6 y=7 ===> Sending: lane=0 x=28 y=11 ===> Sending: lane=1 x=8 y=12 ===> Sending: lane=0 x=40 y=26 ===> Sending: lane=1 x=34 y=3 ===> Receiving: x=43 y=44 result=87 ===> Receiving: x=14 y=25 result=39 ===> Receiving: x=28 y=11 result=39 ===> Receiving: x=40 y=26 result=66 ===> Receiving: x=8 y=19 result=27 ===> Receiving: x=6 y=7 result=13 ===> Receiving: x=8 y=12 result=20 ===> Receiving: x=34 y=3 result=37
The DiamondQueue interface for reference:
package com.coralblocks.coralqueue.diamond;
public interface DiamondQueue<E extends Worker> {
public Input<E> getInput();
public Output<E> getOutput();
public int getNumberOfWorkerThreads();
public void start(boolean deamon);
public void stop();
}