/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.queryengine.execution.exchange.Utils;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class LocalSinkChannelTest {
    @Test
    public void testSend() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0xA00000L, 0x500000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager.SinkListener mockSinkListener = (MPPDataExchangeManager.SinkListener)Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(remoteFragmentInstanceId, "exchange_0", mockLocalMemoryManager, (ExecutorService)MoreExecutors.newDirectExecutorService());
        LocalSinkChannel localSinkChannel = new LocalSinkChannel(localFragmentInstanceId, queue, mockSinkListener);
        queue.setMaxBytesCanReserve(Long.MAX_VALUE);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(localFragmentInstanceId, "exchange_0", queue, (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
        Assert.assertFalse((boolean)localSinkChannel.isFull().isDone());
        localSourceHandle.isBlocked();
        Assert.assertTrue((boolean)localSinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)localSinkChannel.isFinished());
        Assert.assertFalse((boolean)localSinkChannel.isAborted());
        Assert.assertEquals((long)0L, (long)localSinkChannel.getBufferRetainedSizeInBytes());
        int numOfSentTsblocks = 0;
        while (localSinkChannel.isFull().isDone()) {
            localSinkChannel.send(Utils.createMockTsBlock(0x100000L));
            ++numOfSentTsblocks;
        }
        Assert.assertEquals((long)11L, (long)numOfSentTsblocks);
        Assert.assertFalse((boolean)localSinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)localSinkChannel.isFinished());
        Assert.assertEquals((long)0xB00000L, (long)localSinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.times((int)11))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)remoteFragmentInstanceId), "exchange_0", 0x100000L, Long.MAX_VALUE);
        int numOfReceivedTsblocks = 0;
        while (!queue.isEmpty()) {
            queue.remove();
            ++numOfReceivedTsblocks;
        }
        Assert.assertEquals((long)11L, (long)numOfReceivedTsblocks);
        Assert.assertTrue((boolean)localSinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)localSinkChannel.isFinished());
        Assert.assertEquals((long)0L, (long)localSinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.times((int)11))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)remoteFragmentInstanceId), "exchange_0", 0x100000L);
        localSinkChannel.setNoMoreTsBlocks();
        Assert.assertTrue((boolean)localSinkChannel.isFull().isDone());
        Assert.assertTrue((boolean)localSinkChannel.isFinished());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.times((int)1))).onEndOfBlocks((ISink)localSinkChannel);
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.times((int)1))).onFinish((ISink)localSinkChannel);
    }

    @Test
    public void testAbort() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0xA00000L, 0x500000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager.SinkListener mockSinkListener = (MPPDataExchangeManager.SinkListener)Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(remoteFragmentInstanceId, "exchange_0", mockLocalMemoryManager, (ExecutorService)MoreExecutors.newDirectExecutorService());
        LocalSinkChannel localSinkChannel = new LocalSinkChannel(localFragmentInstanceId, queue, mockSinkListener);
        queue.setMaxBytesCanReserve(Long.MAX_VALUE);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(localFragmentInstanceId, "exchange_0", queue, (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
        Assert.assertFalse((boolean)localSinkChannel.isFull().isDone());
        localSourceHandle.isBlocked();
        Assert.assertTrue((boolean)localSinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)localSinkChannel.isFinished());
        Assert.assertFalse((boolean)localSinkChannel.isAborted());
        Assert.assertEquals((long)0L, (long)localSinkChannel.getBufferRetainedSizeInBytes());
        int numOfSentTsblocks = 0;
        while (localSinkChannel.isFull().isDone()) {
            localSinkChannel.send(Utils.createMockTsBlock(0x100000L));
            ++numOfSentTsblocks;
        }
        Assert.assertEquals((long)11L, (long)numOfSentTsblocks);
        ListenableFuture blocked = localSinkChannel.isFull();
        Assert.assertFalse((boolean)blocked.isDone());
        Assert.assertFalse((boolean)localSinkChannel.isFinished());
        Assert.assertEquals((long)0xB00000L, (long)localSinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.times((int)11))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)remoteFragmentInstanceId), "exchange_0", 0x100000L, Long.MAX_VALUE);
        localSinkChannel.abort();
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertFalse((boolean)localSinkChannel.isFinished());
        Assert.assertTrue((boolean)localSinkChannel.isAborted());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.times((int)1))).onAborted((ISink)localSinkChannel);
    }
}

