package com.coralblocks.coralqueue.bench;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;
import com.coralblocks.coralqueue.waitstrategy.WaitStrategy;
import com.coralblocks.coralqueue.waitstrategy.WaitStrategyFactory;
import com.coralblocks.coralthreads.Affinity;
import com.coralblocks.coralutils.SystemUtils;
import com.coralblocks.coralutils.bench.Benchmarker;

/**
 * Different cores: (no hyper-threading)
 * 
 * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=3 -DexcludeNanoTimeCost=true -DqueueCapacity=1024 -Dtimestamper=nanos com.coralblocks.coralqueue.bench.TestMessageThroughput 
 * 
 * Results: Iterations: 20 | Avg Time: 234.781 millis | Min Time: 233.626 millis | Max Time: 236.014 millis | Nano Timing Cost: 14.0 nanos
 * Average time to send 10,000,000 messages: 234,781,092 nanos
 * Messages per second: 42,592,867
* 
 * Same core: (with hyper-threaading)
 * 
 * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=6 -DexcludeNanoTimeCost=true -DqueueCapacity=1024 -Dtimestamper=nanos com.coralblocks.coralqueue.bench.TestMessageThroughput
 * 
 * Results: Iterations: 20 | Avg Time: 117.343 millis | Min Time: 117.273 millis | Max Time: 117.435 millis | Nano Timing Cost: 14.0 nanos
 * Average time to send 10,000,000 messages: 117,342,956 nanos
 * Messages per second: 85,220,283
 */
public class TestMessageThroughput {
	
	public static void main(String[] args) throws InterruptedException {
		
		final int messagesToTest = SystemUtils.getInt("messagesToTest", 10000000);
		final int capacity = SystemUtils.getInt("queueCapacity", 1024);
		final int passes = SystemUtils.getInt("passes", 20);
		final int warmupPasses = SystemUtils.getInt("warmupPasses", 2);
		
		int prodProcToBind = SystemUtils.getInt("producerProcToBind", 2);
		int consProcToBind = SystemUtils.getInt("consumerProcToBind", 3);
		
		final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(capacity, MutableLong.BUILDER);
		final Benchmarker bench = Benchmarker.create(warmupPasses);
		final WaitStrategy waitStrategy = WaitStrategyFactory.getWaitStrategy("park-backoff");
		
		final Queue<MutableLong> backQueue = new AtomicQueue<MutableLong>(capacity, MutableLong.BUILDER);
		
		System.out.println("messagesToTest: " + messagesToTest);
		System.out.println("queueCapacity: " + capacity);
		System.out.println("timestamper: " + bench.getTimestamper());
		
		Thread producer = new Thread(new Runnable() {
			
			@Override
			public void run() {
				
				Affinity.bind();
				
				MutableLong ml = null;
				
				for(int i = 0; i < passes + warmupPasses; i++) { // one for warmup

					long count = 0;
					
					bench.mark();
				
    				while(count < messagesToTest) {
    					
    					while((ml = queue.nextToDispatch()) == null);
    					
    					ml.set(count);
    					
    					queue.flush(true);
    					
    					count++;
    				}
    				
    				while(backQueue.availableToPoll() != 1) {
    					waitStrategy.block();
    				}
    				
    				backQueue.poll().get();
    				backQueue.donePolling(false);
    				
    				waitStrategy.reset();
				}
				
				while((ml = queue.nextToDispatch()) == null);
				
				ml.set(-1);
				
				queue.flush(true);
				
				Affinity.unbind();
				
				System.out.println("producer exiting...");				
			}
		}, "Producer");
		
		Thread consumer = new Thread(new Runnable() {

			@Override
			public void run() {
				
				Affinity.bind();
				
				boolean running = true;
				
				int pass = 0;
				
				while (running) {
					
					long avail = queue.availableToPoll();
					if (avail > 0) {
						for(int i = 0; i < avail; i++) {
							MutableLong ml = queue.poll();
							long x = ml.get();
							if (x == -1) running = false;
							else if (x == messagesToTest - 1) {
								long t = bench.measure();
								System.out.println("Pass " + pass + "... " + (pass < warmupPasses ? "(warmup)" : "(" + Benchmarker.convertNanoTime(t) + ")"));
								pass++;
								
								// notify:
								while((ml = backQueue.nextToDispatch()) == null);
								ml.set(0);
								backQueue.flush(false);
							}
						}
						queue.donePolling(false); // no lazy, tell producer immediately in case queue was full...
					} 
				}
				
				Affinity.unbind();
				
				System.out.println("consumer exiting...");
			}
		}, "Consumer");
		
		if (Affinity.isAvailable()) {
			Affinity.assignToProcessor(prodProcToBind, producer);
			Affinity.assignToProcessor(consProcToBind, consumer);
		} else {
			System.err.println("Thread affinity not available!");
		}
		
		consumer.start();
		producer.start();
		
		consumer.join();
		producer.join();
		
		long time = Math.round(bench.getAverage());
		
		long mps = messagesToTest * 1000000000L / time;
		
		System.out.println("Results: " + bench.results());
		System.out.println("Average time to send " + messagesToTest + " messages: " + time + " nanos");
		System.out.println("Messages per second: " + mps);
	}
}