/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.SinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.Utils;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SinkHandleTest {
    @Test
    public void testOneTimeNotBlockedSend() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (IOException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        MPPDataExchangeManager.SinkHandleListener mockSinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 0x100000L);
        SinkHandle sinkHandle = new SinkHandle(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(0x100000L), mockSinkHandleListener, mockClientManager);
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(mockTsBlocks.get(0));
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)2))).reserve("q0", 0x100000L);
        try {
            Thread.sleep(100L);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)1))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 0 && e.getBlockSizes().size() == 1));
        }
        catch (InterruptedException | TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        for (int i = 0; i < 1; ++i) {
            try {
                sinkHandle.getSerializedTsBlock(i);
            }
            catch (IOException e4) {
                e4.printStackTrace();
                Assert.fail();
            }
            Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        }
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        sinkHandle.setNoMoreTsBlocks();
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onEndOfBlocks((ISinkHandle)sinkHandle);
        sinkHandle.acknowledgeTsBlock(0, 1);
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertTrue((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)0x100000L, (long)sinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)1))).free("q0", 0x100000L);
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onFinish((ISinkHandle)sinkHandle);
        try {
            Thread.sleep(100L);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)1))).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 0 == e.getLastSequenceId()));
        }
        catch (InterruptedException | TException e5) {
            e5.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testMultiTimesBlockedSend() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockBlockedMemoryPool("q0", 1, 0x100000L);
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SinkHandleListener mockSinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (IOException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        SinkHandle sinkHandle = new SinkHandle(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(0x100000L), mockSinkHandleListener, mockClientManager);
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(mockTsBlocks.get(0));
        Assert.assertFalse((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)2))).reserve("q0", 0x100000L);
        try {
            Thread.sleep(100L);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)1))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 0 && e.getBlockSizes().size() == 1));
        }
        catch (InterruptedException | TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        for (int i = 0; i < 1; ++i) {
            try {
                sinkHandle.getSerializedTsBlock(i);
            }
            catch (IOException e4) {
                e4.printStackTrace();
                Assert.fail();
            }
            Assert.assertFalse((boolean)sinkHandle.isFull().isDone());
        }
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        sinkHandle.acknowledgeTsBlock(0, 1);
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)1))).free("q0", 0x100000L);
        sinkHandle.send(mockTsBlocks.get(0));
        Assert.assertFalse((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)3))).reserve("q0", 0x100000L);
        try {
            Thread.sleep(100L);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)1))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 1 && e.getBlockSizes().size() == 1));
        }
        catch (InterruptedException | TException e5) {
            e5.printStackTrace();
            Assert.fail();
        }
        sinkHandle.setNoMoreTsBlocks();
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onEndOfBlocks((ISinkHandle)sinkHandle);
        try {
            Thread.sleep(100L);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)1))).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 1 == e.getLastSequenceId()));
        }
        catch (InterruptedException | TException e6) {
            e6.printStackTrace();
            Assert.fail();
        }
        for (int i = 1; i < 2; ++i) {
            try {
                sinkHandle.getSerializedTsBlock(i);
                continue;
            }
            catch (IOException e7) {
                e7.printStackTrace();
                Assert.fail();
            }
        }
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        sinkHandle.acknowledgeTsBlock(1, 2);
        Assert.assertTrue((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)2))).free("q0", 0x100000L);
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onFinish((ISinkHandle)sinkHandle);
    }

    @Test
    public void testFailedSend() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockBlockedMemoryPool("q0", 1, 0x100000L);
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SinkHandleListener mockSinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        TException mockException = new TException("Mock exception");
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doThrow((Throwable[])new Throwable[]{mockException}).when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doThrow((Throwable[])new Throwable[]{mockException}).when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (IOException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        SinkHandle sinkHandle = new SinkHandle(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(0x100000L), mockSinkHandleListener, mockClientManager);
        sinkHandle.setRetryIntervalInMs(0L);
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(mockTsBlocks.get(0));
        Assert.assertFalse((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool)Mockito.verify((Object)mockMemoryPool, (VerificationMode)Mockito.times((int)2))).reserve("q0", 0x100000L);
        try {
            Thread.sleep(100L);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)3))).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getTargetFragmentInstanceId()) && "exchange_0".equals(e.getTargetPlanNodeId()) && localFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && e.getStartSequenceId() == 0 && e.getBlockSizes().size() == 1));
        }
        catch (InterruptedException | TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onFailure((ISinkHandle)sinkHandle, (Throwable)mockException);
        try {
            sinkHandle.setNoMoreTsBlocks();
            Assert.fail((String)"Expect an RuntimeException.");
        }
        catch (RuntimeException e4) {
            Assert.assertEquals((Object)"Send EndOfDataBlockEvent failed", (Object)e4.getMessage());
        }
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)0))).onEndOfBlocks((ISinkHandle)sinkHandle);
        sinkHandle.abort();
        Assert.assertTrue((boolean)sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onAborted((ISinkHandle)sinkHandle);
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)0))).onFinish((ISinkHandle)sinkHandle);
    }

    @Test
    public void testAbort() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        boolean numOfMockTsBlock = true;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0x100000L, 0x100000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager.SinkHandleListener mockSinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(1, 0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent)Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doNothing().when((Object)mockClient)).onNewDataBlockEvent((TNewDataBlockEvent)Mockito.any(TNewDataBlockEvent.class));
        }
        catch (IOException | TException e) {
            e.printStackTrace();
            Assert.fail();
        }
        SinkHandle sinkHandle = new SinkHandle(remoteEndpoint, remoteFragmentInstanceId, "exchange_0", localFragmentInstanceId, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(0x100000L), mockSinkHandleListener, mockClientManager);
        Assert.assertTrue((boolean)sinkHandle.isFull().isDone());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(mockTsBlocks.get(0));
        ListenableFuture blocked = sinkHandle.isFull();
        Assert.assertFalse((boolean)blocked.isDone());
        Assert.assertFalse((boolean)blocked.isCancelled());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertFalse((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)(0x100000L + (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES), (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)1L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.abort();
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertTrue((boolean)blocked.isCancelled());
        Assert.assertFalse((boolean)sinkHandle.isFinished());
        Assert.assertTrue((boolean)sinkHandle.isAborted());
        Assert.assertEquals((long)0L, (long)sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals((long)0L, (long)sinkHandle.getNumOfBufferedTsBlocks());
        ((MPPDataExchangeManager.SinkHandleListener)Mockito.verify((Object)mockSinkHandleListener, (VerificationMode)Mockito.times((int)1))).onAborted((ISinkHandle)sinkHandle);
        Assert.assertEquals((long)0L, (long)spyMemoryPool.getQueryMemoryReservedBytes("q0"));
    }
}

