/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.Utils;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SourceHandleTest {
    private static final long MOCK_TSBLOCK_SIZE = 0x100000L;
    private static long maxBytesPerFI;

    @BeforeClass
    public static void beforeClass() {
        maxBytesPerFI = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
        IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(0x500000L);
    }

    @AfterClass
    public static void afterClass() {
        IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(maxBytesPerFI);
    }

    @Test
    public void testNonBlockedOneTimeReceive() {
        String queryId = "q0";
        int numOfMockTsBlock = 10;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doAnswer(invocation -> {
                TGetDataBlockRequest req = (TGetDataBlockRequest)invocation.getArgument(0);
                ArrayList<ByteBuffer> byteBuffers = new ArrayList<ByteBuffer>(req.getEndSequenceId() - req.getStartSequenceId());
                for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); ++i) {
                    byteBuffers.add(ByteBuffer.allocate(0));
                }
                return new TGetDataBlockResponse(byteBuffers);
            }).when((Object)mockClient)).getDataBlock((TGetDataBlockRequest)Mockito.any(TGetDataBlockRequest.class));
        }
        catch (ClientManagerException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(0x100000L);
        SourceHandle sourceHandle = new SourceHandle(remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, "exchange_0", 0, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), mockTsBlockSerde, mockSourceHandleListener, mockClientManager);
        Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        sourceHandle.updatePendingDataBlockInfo(0, Stream.generate(() -> 0x100000L).limit(10L).collect(Collectors.toList()));
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).getDataBlock((TGetDataBlockRequest)Mockito.argThat(req -> remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId()) && 0 == req.getStartSequenceId() && 10 == req.getEndSequenceId()));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAcknowledgeDataBlockEvent((TAcknowledgeDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 0 == e.getStartSequenceId() && 10 == e.getEndSequenceId()));
        }
        catch (TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0xA00000L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        for (int i = 0; i < 10; ++i) {
            sourceHandle.receive();
            if (i < 9) {
                Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
            } else {
                Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
            }
            Assert.assertFalse((boolean)sourceHandle.isAborted());
            Assert.assertFalse((boolean)sourceHandle.isFinished());
            Assert.assertEquals((long)((long)(9 - i) * 0x100000L), (long)sourceHandle.getBufferRetainedSizeInBytes());
        }
        sourceHandle.setNoMoreTsBlocks(9);
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertTrue((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onFinished((ISourceHandle)sourceHandle);
    }

    @Test
    public void testBlockedOneTimeReceive() {
        String queryId = "q0";
        int numOfMockTsBlock = 10;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 0xA00000L, 0x500000L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doAnswer(invocation -> {
                TGetDataBlockRequest req = (TGetDataBlockRequest)invocation.getArgument(0);
                ArrayList<ByteBuffer> byteBuffers = new ArrayList<ByteBuffer>(req.getEndSequenceId() - req.getStartSequenceId());
                for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); ++i) {
                    byteBuffers.add(ByteBuffer.allocate(0));
                }
                return new TGetDataBlockResponse(byteBuffers);
            }).when((Object)mockClient)).getDataBlock((TGetDataBlockRequest)Mockito.any(TGetDataBlockRequest.class));
        }
        catch (ClientManagerException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(0x100000L);
        SourceHandle sourceHandle = new SourceHandle(remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, "exchange_0", 0, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), mockTsBlockSerde, mockSourceHandleListener, mockClientManager);
        long maxBytesCanReserve = Math.min(0x500000L, IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
        sourceHandle.setMaxBytesCanReserve(maxBytesCanReserve);
        Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        sourceHandle.updatePendingDataBlockInfo(0, Stream.generate(() -> 0x100000L).limit(10L).collect(Collectors.toList()));
        try {
            ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.timeout((long)10000L).times(6))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "exchange_0", 0x100000L, maxBytesCanReserve);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)100000L).times(1))).getDataBlock((TGetDataBlockRequest)Mockito.argThat(req -> remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId()) && 0 == req.getStartSequenceId() && 5 == req.getEndSequenceId()));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAcknowledgeDataBlockEvent((TAcknowledgeDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 0 == e.getStartSequenceId() && 5 == e.getEndSequenceId()));
        }
        catch (TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0x600000L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        for (int i = 0; i < 10; ++i) {
            ((MemoryPool)Mockito.verify((Object)spyMemoryPool, (VerificationMode)Mockito.timeout((long)100000L).times(i))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId((TFragmentInstanceId)localFragmentInstanceId), "exchange_0", 0x100000L);
            sourceHandle.receive();
            try {
                if (i < 5) {
                    Assert.assertEquals((long)(i == 4 ? 0x500000L : 0x600000L), (long)sourceHandle.getBufferRetainedSizeInBytes());
                    int startSequenceId = 5 + i;
                    ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).getDataBlock((TGetDataBlockRequest)Mockito.argThat(req -> remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId()) && startSequenceId == req.getStartSequenceId() && startSequenceId + 1 == req.getEndSequenceId()));
                    ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).onAcknowledgeDataBlockEvent((TAcknowledgeDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && startSequenceId == e.getStartSequenceId() && startSequenceId + 1 == e.getEndSequenceId()));
                } else {
                    Assert.assertEquals((long)((long)(9 - i) * 0x100000L), (long)sourceHandle.getBufferRetainedSizeInBytes());
                }
            }
            catch (TException e4) {
                e4.printStackTrace();
                Assert.fail();
            }
            if (i < 9) {
                Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
            } else {
                Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
            }
            Assert.assertFalse((boolean)sourceHandle.isAborted());
            Assert.assertFalse((boolean)sourceHandle.isFinished());
        }
        sourceHandle.setNoMoreTsBlocks(9);
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertTrue((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onFinished((ISourceHandle)sourceHandle);
    }

    @Test
    public void testMultiTimesReceive() {
        String queryId = "q0";
        long MOCK_TSBLOCK_SIZE = 0x100000L;
        int numOfMockTsBlock = 10;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doAnswer(invocation -> {
                TGetDataBlockRequest req = (TGetDataBlockRequest)invocation.getArgument(0);
                ArrayList<ByteBuffer> byteBuffers = new ArrayList<ByteBuffer>(req.getEndSequenceId() - req.getStartSequenceId());
                for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); ++i) {
                    byteBuffers.add(ByteBuffer.allocate(0));
                }
                return new TGetDataBlockResponse(byteBuffers);
            }).when((Object)mockClient)).getDataBlock((TGetDataBlockRequest)Mockito.any(TGetDataBlockRequest.class));
        }
        catch (ClientManagerException | TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        SourceHandle sourceHandle = new SourceHandle(remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, "exchange_0", 0, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), mockTsBlockSerde, mockSourceHandleListener, mockClientManager);
        Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        sourceHandle.updatePendingDataBlockInfo(10, Stream.generate(() -> 0x100000L).limit(10L).collect(Collectors.toList()));
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(0))).getDataBlock((TGetDataBlockRequest)Mockito.any(TGetDataBlockRequest.class));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)100000L).times(0))).onAcknowledgeDataBlockEvent((TAcknowledgeDataBlockEvent)Mockito.any(TAcknowledgeDataBlockEvent.class));
        }
        catch (TException e3) {
            e3.printStackTrace();
            Assert.fail();
        }
        Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        sourceHandle.updatePendingDataBlockInfo(0, Stream.generate(() -> 0x100000L).limit(10L).collect(Collectors.toList()));
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).getDataBlock((TGetDataBlockRequest)Mockito.argThat(req -> remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId()) && 0 == req.getStartSequenceId() && 20 == req.getEndSequenceId()));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAcknowledgeDataBlockEvent((TAcknowledgeDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 0 == e.getStartSequenceId() && 20 == e.getEndSequenceId()));
        }
        catch (TException e4) {
            e4.printStackTrace();
            Assert.fail();
        }
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0x1400000L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        for (int i = 0; i < 20; ++i) {
            sourceHandle.receive();
            if (i < 19) {
                Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
            } else {
                Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
            }
            Assert.assertFalse((boolean)sourceHandle.isAborted());
            Assert.assertFalse((boolean)sourceHandle.isFinished());
            Assert.assertEquals((long)((long)(19 - i) * 0x100000L), (long)sourceHandle.getBufferRetainedSizeInBytes());
        }
        sourceHandle.updatePendingDataBlockInfo(20, Stream.generate(() -> 0x100000L).limit(10L).collect(Collectors.toList()));
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(1))).getDataBlock((TGetDataBlockRequest)Mockito.argThat(req -> remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId()) && 20 == req.getStartSequenceId() && 30 == req.getEndSequenceId()));
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAcknowledgeDataBlockEvent((TAcknowledgeDataBlockEvent)Mockito.argThat(e -> remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId()) && 20 == e.getStartSequenceId() && 30 == e.getEndSequenceId()));
        }
        catch (TException e5) {
            e5.printStackTrace();
            Assert.fail();
        }
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0xA00000L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        for (int i = 0; i < 10; ++i) {
            sourceHandle.receive();
            if (i < 9) {
                Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
            } else {
                Assert.assertFalse((boolean)sourceHandle.isBlocked().isDone());
            }
            Assert.assertFalse((boolean)sourceHandle.isAborted());
            Assert.assertFalse((boolean)sourceHandle.isFinished());
            Assert.assertEquals((long)((long)(9 - i) * 0x100000L), (long)sourceHandle.getBufferRetainedSizeInBytes());
        }
        sourceHandle.setNoMoreTsBlocks(29);
        Assert.assertTrue((boolean)sourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertTrue((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onFinished((ISourceHandle)sourceHandle);
    }

    @Test
    public void testFailedReceive() {
        String queryId = "q0";
        long MOCK_TSBLOCK_SIZE = 0x100000L;
        int numOfMockTsBlock = 10;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(0x100000L);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        TException mockException = new TException("Mock exception");
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doThrow((Throwable[])new Throwable[]{mockException}).when((Object)mockClient)).getDataBlock((TGetDataBlockRequest)Mockito.any(TGetDataBlockRequest.class));
        }
        catch (ClientManagerException | TException e) {
            e.printStackTrace();
            Assert.fail();
        }
        SourceHandle sourceHandle = new SourceHandle(remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, "exchange_0", 0, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), mockTsBlockSerde, mockSourceHandleListener, mockClientManager);
        sourceHandle.setRetryIntervalInMs(0L);
        ListenableFuture blocked = sourceHandle.isBlocked();
        Assert.assertFalse((boolean)blocked.isDone());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        sourceHandle.updatePendingDataBlockInfo(0, Stream.generate(() -> 0x100000L).limit(10L).collect(Collectors.toList()));
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.timeout((long)10000L).times(3))).getDataBlock((TGetDataBlockRequest)Mockito.any());
        }
        catch (TException e) {
            e.printStackTrace();
            Assert.fail();
        }
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onFailure((ISourceHandle)sourceHandle, (Throwable)mockException);
        Assert.assertFalse((boolean)blocked.isDone());
        sourceHandle.abort();
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertTrue((boolean)sourceHandle.isAborted());
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAborted((ISourceHandle)sourceHandle);
    }

    @Test
    public void testForceClose() {
        String queryId = "q0";
        long MOCK_TSBLOCK_SIZE = 0x100000L;
        TEndPoint remoteEndpoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        IClientManager mockClientManager = (IClientManager)Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient mockClient = (SyncDataNodeMPPDataExchangeServiceClient)Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((Object)((SyncDataNodeMPPDataExchangeServiceClient)mockClientManager.borrowClient((Object)remoteEndpoint))).thenReturn((Object)mockClient);
            ((SyncDataNodeMPPDataExchangeServiceClient)Mockito.doAnswer(invocation -> {
                TGetDataBlockRequest req = (TGetDataBlockRequest)invocation.getArgument(0);
                ArrayList<ByteBuffer> byteBuffers = new ArrayList<ByteBuffer>(req.getEndSequenceId() - req.getStartSequenceId());
                for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); ++i) {
                    byteBuffers.add(ByteBuffer.allocate(0));
                }
                return new TGetDataBlockResponse(byteBuffers);
            }).when((Object)mockClient)).getDataBlock((TGetDataBlockRequest)Mockito.any(TGetDataBlockRequest.class));
        }
        catch (ClientManagerException | TException e) {
            e.printStackTrace();
            Assert.fail();
        }
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(0x100000L);
        SourceHandle sourceHandle = new SourceHandle(remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, "exchange_0", 0, mockLocalMemoryManager, Executors.newSingleThreadExecutor(), mockTsBlockSerde, mockSourceHandleListener, mockClientManager);
        ListenableFuture blocked = sourceHandle.isBlocked();
        Assert.assertFalse((boolean)blocked.isDone());
        Assert.assertFalse((boolean)blocked.isCancelled());
        Assert.assertFalse((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        sourceHandle.abort();
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertTrue((boolean)blocked.isCancelled());
        Assert.assertTrue((boolean)sourceHandle.isAborted());
        Assert.assertFalse((boolean)sourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)sourceHandle.getBufferRetainedSizeInBytes());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.timeout((long)100000L).times(1))).onAborted((ISourceHandle)sourceHandle);
    }
}

