/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.LocalSinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.execution.exchange.Utils;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class LocalSinkHandleTest {
    @Test
    public void testSend() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0xA00000L, 0x500000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager.SinkHandleListener mockSinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(remoteFragmentInstanceId, mockLocalMemoryManager);
        LocalSinkHandle localSinkHandle = new LocalSinkHandle(remoteFragmentInstanceId, "exchange_0", localFragmentInstanceId, queue, mockSinkHandleListener);
        Assert.assertTrue((boolean)localSinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)localSinkHandle.isFinished());
        Assert.assertFalse((boolean)localSinkHandle.isAborted());
        Assert.assertEquals((long)0L, (long)localSinkHandle.getBufferRetainedSizeInBytes());
        int numOfSentTsblocks = 0;
        while (localSinkHandle.isFull().isDone()) {
            localSinkHandle.send(Utils.createMockTsBlock(0x100000L));
            ++numOfSentTsblocks;
        }
        Assert.assertEquals((long)6L, (long)numOfSentTsblocks);
        Assert.assertFalse((boolean)localSinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)localSinkHandle.isFinished());
        Assert.assertEquals((long)0x600000L, (long)localSinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.times((int)6))).reserve("q0", 0x100000L);
        int numOfReceivedTsblocks = 0;
        while (!queue.isEmpty()) {
            queue.remove();
            ++numOfReceivedTsblocks;
        }
        Assert.assertEquals((long)6L, (long)numOfReceivedTsblocks);
        Assert.assertTrue((boolean)localSinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)localSinkHandle.isFinished());
        Assert.assertEquals((long)0L, (long)localSinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.times((int)6))).free("q0", 0x100000L);
        localSinkHandle.setNoMoreTsBlocks();
        Assert.assertTrue((boolean)localSinkHandle.isFull().isDone());
        Assert.assertTrue((boolean)localSinkHandle.isFinished());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onEndOfBlocks((ISinkHandle)localSinkHandle);
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onFinish((ISinkHandle)localSinkHandle);
    }

    @Test
    public void testAbort() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0xA00000L, 0x500000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager.SinkHandleListener mockSinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(remoteFragmentInstanceId, mockLocalMemoryManager);
        LocalSinkHandle localSinkHandle = new LocalSinkHandle(remoteFragmentInstanceId, "exchange_0", localFragmentInstanceId, queue, mockSinkHandleListener);
        Assert.assertTrue((boolean)localSinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)localSinkHandle.isFinished());
        Assert.assertFalse((boolean)localSinkHandle.isAborted());
        Assert.assertEquals((long)0L, (long)localSinkHandle.getBufferRetainedSizeInBytes());
        int numOfSentTsblocks = 0;
        while (localSinkHandle.isFull().isDone()) {
            localSinkHandle.send(Utils.createMockTsBlock(0x100000L));
            ++numOfSentTsblocks;
        }
        Assert.assertEquals((long)6L, (long)numOfSentTsblocks);
        ListenableFuture blocked = localSinkHandle.isFull();
        Assert.assertFalse((boolean)blocked.isDone());
        Assert.assertFalse((boolean)localSinkHandle.isFinished());
        Assert.assertEquals((long)0x600000L, (long)localSinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.times((int)6))).reserve("q0", 0x100000L);
        localSinkHandle.abort();
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertFalse((boolean)localSinkHandle.isFinished());
        Assert.assertTrue((boolean)localSinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onAborted((ISinkHandle)localSinkHandle);
    }
}

