In this article we present the DiamondQueue
, which is nothing more than thread A sending through a demultiplexer a bunch of requests to a fixed set of worker threads, then this set of worker threads sending the results to thread B through a multiplexer. It is important to note that thread A and thread B can be the same thread. The DiamondQueue
also supports lanes to enforce message order when needed.
Below a simple example of how to use the DiamondQueue
:
package com.coralblocks.coralqueue.test; import java.util.Random; import com.coralblocks.coralbits.util.ThreadUtils; import com.coralblocks.coralqueue.diamond.AtomicDiamondQueue; import com.coralblocks.coralqueue.diamond.DiamondQueue; import com.coralblocks.coralqueue.diamond.Input; import com.coralblocks.coralqueue.diamond.Output; import com.coralblocks.coralqueue.diamond.Worker; public class TestDiamond { private static final Random RAND = new Random(); public static class AddWorker extends Worker { public int x; public int y; public int result; @Override public boolean execute() { ThreadUtils.sleep(RAND.nextInt(10)); this.result = x + y; return true; // successful! } } public static void main(String[] args) { DiamondQueue<AddWorker> diamond = new AtomicDiamondQueue<AddWorker>( 512 /* capacity per worker thread lane */, AddWorker.class, /* the worker class being used */ 2 /* number of worker threads */, new int[] { 1, 2 } /* proc_ids to bind worker threads */ ); Input<AddWorker> input = diamond.getInput(); Output<AddWorker> output = diamond.getOutput(); // start all worker threads... diamond.start(false); // false = non-daemon thread... final int TOTAL_NUMBER_OF_MSG_TO_SEND = 8; // FIRST SCENARIO: No order for(int i = 0; i < TOTAL_NUMBER_OF_MSG_TO_SEND; i++) { AddWorker aw = null; final int lane = -1; // choose a random free lane... while((aw = input.nextToDispatch(lane)) == null); // busy spin wait strategy... aw.x = RAND.nextInt(50); aw.y = RAND.nextInt(50); System.out.println("===> Sending: lane=" + lane + " x=" + aw.x + " y=" + aw.y); } input.flush(true); // send the messages down the diamond... int messagesReceived = 0; while(messagesReceived < TOTAL_NUMBER_OF_MSG_TO_SEND) { long avail = output.availableToPoll(); if (avail > 0) { for(long x = 0; x < avail; x++) { AddWorker aw = output.poll(); if (aw.wasSuccessful()) { System.out.println("===> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.result); } else { System.out.println("XXXX> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.getException()); if (aw.getException() != null) aw.getException().printStackTrace(); } messagesReceived++; } output.donePolling(true); } else { // busy spin wait strategy... } } System.out.println(); // SECOND SCENARIO: Ordered for(int i = 0; i < TOTAL_NUMBER_OF_MSG_TO_SEND; i++) { AddWorker aw = null; // lanes guarantee order... (same lane => same order) final int lane = (i % diamond.getNumberOfWorkerThreads()); while((aw = input.nextToDispatch(lane)) == null); // busy spin wait strategy... aw.x = RAND.nextInt(50); aw.y = RAND.nextInt(50); System.out.println("===> Sending: lane=" + lane + " x=" + aw.x + " y=" + aw.y); } input.flush(true); // send the messages down the diamond... messagesReceived = 0; while(messagesReceived < TOTAL_NUMBER_OF_MSG_TO_SEND) { long avail = output.availableToPoll(); if (avail > 0) { for(long x = 0; x < avail; x++) { AddWorker aw = output.poll(); if (aw.wasSuccessful()) { System.out.println("===> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.result); } else { System.out.println("XXXX> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.getException()); if (aw.getException() != null) aw.getException().printStackTrace(); } messagesReceived++; } output.donePolling(true); } else { // busy spin wait strategy... } } diamond.stop(); // stop all worker threads... } }
Output:
===> Sending: lane=-1 x=18 y=30 ===> Sending: lane=-1 x=49 y=5 ===> Sending: lane=-1 x=47 y=15 ===> Sending: lane=-1 x=35 y=38 ===> Sending: lane=-1 x=26 y=6 ===> Sending: lane=-1 x=17 y=34 ===> Sending: lane=-1 x=15 y=8 ===> Sending: lane=-1 x=22 y=10 ===> Receiving: x=49 y=5 result=54 ===> Receiving: x=35 y=38 result=73 ===> Receiving: x=17 y=34 result=51 ===> Receiving: x=22 y=10 result=32 ===> Receiving: x=18 y=30 result=48 ===> Receiving: x=47 y=15 result=62 ===> Receiving: x=26 y=6 result=32 ===> Receiving: x=15 y=8 result=23 ===> Sending: lane=0 x=43 y=44 ===> Sending: lane=1 x=8 y=19 ===> Sending: lane=0 x=14 y=25 ===> Sending: lane=1 x=6 y=7 ===> Sending: lane=0 x=28 y=11 ===> Sending: lane=1 x=8 y=12 ===> Sending: lane=0 x=40 y=26 ===> Sending: lane=1 x=34 y=3 ===> Receiving: x=43 y=44 result=87 ===> Receiving: x=14 y=25 result=39 ===> Receiving: x=28 y=11 result=39 ===> Receiving: x=40 y=26 result=66 ===> Receiving: x=8 y=19 result=27 ===> Receiving: x=6 y=7 result=13 ===> Receiving: x=8 y=12 result=20 ===> Receiving: x=34 y=3 result=37
The DiamondQueue
interface for reference:
package com.coralblocks.coralqueue.diamond; public interface DiamondQueue<E extends Worker> { public Input<E> getInput(); public Output<E> getOutput(); public int getNumberOfWorkerThreads(); public void start(boolean deamon); public void stop(); }