/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.Utils;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.SinkChannel;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SinkChannelTest {
    private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();

    @Test
    public void testOneTimeNotBlockedSend() {
        String queryId = "q0";
        long mockTsBlockSize = 131072L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        String localPlanNodeId = "fragmentSink_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (ClientManagerException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        MPPDataExchangeManager.SinkListener mockSinkListener = (MPPDataExchangeManager.SinkListener)Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 131072L);
        SinkChannel sinkChannel = new SinkChannel(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", "fragmentSink_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(131072L), mockSinkListener, mockClientManager);
        sinkChannel.open();
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        sinkChannel.send(mockTsBlocks.get(0));
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)(131072L + (long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 0 && e.getBlockSizes().size() == 1));
        }
        catch (TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        for (int i = 0; i < 1; ++i) {
            try {
                sinkChannel.getSerializedTsBlock(i);
            }
            catch (IOException e4) {
                e4.printStackTrace();
                Assert.fail();
            }
            Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        }
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        sinkChannel.setNoMoreTsBlocks();
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onEndOfBlocks((ISink)sinkChannel);
        sinkChannel.acknowledgeTsBlock(0, 1);
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertTrue((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)131072L, (long)sinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.timeout((long)100000L).times(1))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "fragmentSink_0", 131072L);
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onFinish((ISink)sinkChannel);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 0 == e.getLastSequenceId()));
        }
        catch (TException e5) {
            e5.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testMultiTimesBlockedSend() {
        String queryId = "q0";
        long mockTsBlockSize = 131072L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        String localPlanNodeId = "fragmentSink_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockBlockedMemoryPool("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "fragmentSink_0", 1, 131072L);
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SinkListener mockSinkListener = (MPPDataExchangeManager.SinkListener)Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 131072L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (ClientManagerException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        SinkChannel sinkChannel = new SinkChannel(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", "fragmentSink_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(131072L), mockSinkListener, mockClientManager);
        sinkChannel.open();
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        sinkChannel.send(mockTsBlocks.get(0));
        Assert.assertFalse((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)(131072L + (long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 0 && e.getBlockSizes().size() == 1));
        }
        catch (TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        for (int i = 0; i < 1; ++i) {
            try {
                sinkChannel.getSerializedTsBlock(i);
            }
            catch (IOException e4) {
                e4.printStackTrace();
                Assert.fail();
            }
            Assert.assertFalse((boolean)sinkChannel.isFull().isDone());
        }
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        sinkChannel.acknowledgeTsBlock(0, 1);
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.timeout((long)100000L).times(1))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "fragmentSink_0", 131072L);
        sinkChannel.send(mockTsBlocks.get(0));
        Assert.assertFalse((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)(131072L + (long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 1 && e.getBlockSizes().size() == 1));
        }
        catch (TException e5) {
            e5.printStackTrace();
            Assert.fail();
        }
        sinkChannel.setNoMoreTsBlocks();
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onEndOfBlocks((ISink)sinkChannel);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 1 == e.getLastSequenceId()));
        }
        catch (TException e6) {
            e6.printStackTrace();
            Assert.fail();
        }
        for (int i = 1; i < 2; ++i) {
            try {
                sinkChannel.getSerializedTsBlock(i);
                continue;
            }
            catch (IOException e7) {
                e7.printStackTrace();
                Assert.fail();
            }
        }
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        sinkChannel.acknowledgeTsBlock(1, 2);
        Assert.assertTrue((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.timeout((long)100000L).times(2))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "fragmentSink_0", 131072L);
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onFinish((ISink)sinkChannel);
    }

    @Test
    public void testFailedSend() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        String localPlanNodeId = "fragmentSink_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockBlockedMemoryPool("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "fragmentSink_0", 1, 0x100000L);
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SinkListener mockSinkListener = (MPPDataExchangeManager.SinkListener)Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        TException mockException = new TException("Mock exception");
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doThrow((Throwable[])new Throwable[]{mockException}).when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doThrow((Throwable[])new Throwable[]{mockException}).when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (ClientManagerException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        SinkChannel sinkChannel = new SinkChannel(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", "fragmentSink_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(0x100000L), mockSinkListener, mockClientManager);
        sinkChannel.setRetryIntervalInMs(0L);
        sinkChannel.open();
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        sinkChannel.send(mockTsBlocks.get(0));
        Assert.assertFalse((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(3))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 0 && e.getBlockSizes().size() == 1));
        }
        catch (TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onFailure((ISink)sinkChannel, (Throwable)mockException);
        sinkChannel.setNoMoreTsBlocks();
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)10000L).times(0))).onEndOfBlocks((ISink)sinkChannel);
        sinkChannel.abort();
        Assert.assertTrue((boolean)sinkChannel.isAborted());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAborted((ISink)sinkChannel);
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)100000L).times(0))).onFinish((ISink)sinkChannel);
    }

    @Test
    public void testAbort() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        String localPlanNodeId = "fragmentSink_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0x100000L, 0x100000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager.SinkListener mockSinkListener = (MPPDataExchangeManager.SinkListener)Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (ClientManagerException | TException e) {
            e.printStackTrace();
            Assert.fail();
        }
        SinkChannel sinkChannel = new SinkChannel(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", "fragmentSink_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(0x100000L), mockSinkListener, mockClientManager);
        sinkChannel.setMaxBytesCanReserve(Long.MAX_VALUE);
        sinkChannel.open();
        Assert.assertTrue((boolean)sinkChannel.isFull().isDone());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        sinkChannel.send(mockTsBlocks.get(0));
        ListenableFuture blocked = sinkChannel.isFull();
        Assert.assertFalse((boolean)blocked.isDone());
        Assert.assertFalse((boolean)blocked.isCancelled());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertFalse((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        sinkChannel.abort();
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertTrue((boolean)blocked.isCancelled());
        Assert.assertFalse((boolean)sinkChannel.isFinished());
        Assert.assertTrue((boolean)sinkChannel.isAborted());
        Assert.assertEquals((long)0L, (long)sinkChannel.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkChannel.getNumOfBufferedTsBlocks());
        ((MPPDataExchangeManager.SinkListener)Mockito.verify((Object)mockSinkListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAborted((ISink)sinkChannel);
        Assert.assertEquals((long)0L, (long)spyMemoryPool.getQueryMemoryReservedBytes("q0"));
    }
}

