/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.db.queryengine.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.queryengine.execution.exchange.Utils;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class SharedTsBlockQueueTest {
    @Test(timeout=15000L)
    public void concurrencyTest() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0xA00000L, 0x500000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(new TFragmentInstanceId("q0", 0, "0"), "test", mockLocalMemoryManager, (ExecutorService)MoreExecutors.newDirectExecutorService());
        queue.getCanAddTsBlock().set(null);
        queue.setMaxBytesCanReserve(Long.MAX_VALUE);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        AtomicReference<Integer> numOfTimesSenderBlocked = new AtomicReference<Integer>(0);
        AtomicReference<Integer> numOfTimesReceiverBlocked = new AtomicReference<Integer>(0);
        AtomicReference<Integer> numOfTsBlocksToSend = new AtomicReference<Integer>(1000);
        AtomicReference<Integer> numOfTsBlocksToReceive = new AtomicReference<Integer>(1000);
        executor.submit(new SendTask(queue, 0x100000L, numOfTsBlocksToSend, numOfTimesSenderBlocked, executor));
        executor.submit(new ReceiveTask(queue, numOfTsBlocksToReceive, numOfTimesReceiverBlocked, executor));
        while (numOfTsBlocksToSend.get() != 0 && numOfTsBlocksToReceive.get() != 0) {
            String message = String.format("Sender %d: %d, Receiver %d: %d", numOfTimesSenderBlocked.get(), numOfTsBlocksToSend.get(), numOfTimesReceiverBlocked.get(), numOfTsBlocksToReceive.get());
            System.out.println(message);
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                Assert.fail((String)e.getMessage());
            }
        }
    }

    private static class ReceiveTask
    implements Runnable {
        private final SharedTsBlockQueue queue;
        private final AtomicReference<Integer> numOfTsBlocksToReceive;
        private final AtomicReference<Integer> numOfTimesBlocked;
        private final ExecutorService executor;

        public ReceiveTask(SharedTsBlockQueue queue, AtomicReference<Integer> numOfTsBlocksToReceive, AtomicReference<Integer> numOfTimesBlocked, ExecutorService executor) {
            this.queue = (SharedTsBlockQueue)Validate.notNull((Object)queue);
            this.numOfTsBlocksToReceive = (AtomicReference)Validate.notNull(numOfTsBlocksToReceive);
            this.numOfTimesBlocked = (AtomicReference)Validate.notNull(numOfTimesBlocked);
            this.executor = (ExecutorService)Validate.notNull((Object)executor);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ListenableFuture blocked = null;
            while (this.numOfTsBlocksToReceive.get() > 0) {
                SharedTsBlockQueue sharedTsBlockQueue = this.queue;
                synchronized (sharedTsBlockQueue) {
                    blocked = this.queue.isBlocked();
                    if (!blocked.isDone()) {
                        break;
                    }
                    this.queue.remove();
                    this.numOfTsBlocksToReceive.updateAndGet(v -> v - 1);
                }
            }
            if (blocked != null) {
                this.numOfTimesBlocked.updateAndGet(v -> v + 1);
                blocked.addListener((Runnable)new ReceiveTask(this.queue, this.numOfTsBlocksToReceive, this.numOfTimesBlocked, this.executor), (Executor)this.executor);
            }
        }
    }

    private static class SendTask
    implements Runnable {
        private final SharedTsBlockQueue queue;
        private final long mockTsBlockSize;
        private final AtomicReference<Integer> numOfTsBlocksToSend;
        private final AtomicReference<Integer> numOfTimesBlocked;
        private final ExecutorService executor;

        public SendTask(SharedTsBlockQueue queue, long mockTsBlockSize, AtomicReference<Integer> numOfTsBlocksToSend, AtomicReference<Integer> numOfTimesBlocked, ExecutorService executor) {
            this.queue = (SharedTsBlockQueue)Validate.notNull((Object)queue);
            Validate.isTrue((mockTsBlockSize > 0L ? 1 : 0) != 0);
            this.mockTsBlockSize = mockTsBlockSize;
            this.numOfTsBlocksToSend = (AtomicReference)Validate.notNull(numOfTsBlocksToSend);
            this.numOfTimesBlocked = (AtomicReference)Validate.notNull(numOfTimesBlocked);
            this.executor = (ExecutorService)Validate.notNull((Object)executor);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            SharedTsBlockQueue sharedTsBlockQueue;
            ListenableFuture blockedOnMemory = null;
            while (this.numOfTsBlocksToSend.get() > 0) {
                sharedTsBlockQueue = this.queue;
                synchronized (sharedTsBlockQueue) {
                    blockedOnMemory = this.queue.add(Utils.createMockTsBlock(this.mockTsBlockSize));
                }
                this.numOfTsBlocksToSend.updateAndGet(v -> v - 1);
                if (blockedOnMemory.isDone()) continue;
            }
            if (blockedOnMemory != null) {
                this.numOfTimesBlocked.updateAndGet(v -> v + 1);
                blockedOnMemory.addListener((Runnable)new SendTask(this.queue, this.mockTsBlockSize, this.numOfTsBlocksToSend, this.numOfTimesBlocked, this.executor), (Executor)this.executor);
            } else {
                sharedTsBlockQueue = this.queue;
                synchronized (sharedTsBlockQueue) {
                    this.queue.setNoMoreTsBlocks(true);
                }
            }
        }
    }
}

