/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.queryengine.execution.exchange.Utils;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class LocalSourceHandleTest {
    @Test
    public void testReceive() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(localFragmentInstanceId, "exchange_0", mockLocalMemoryManager, (ExecutorService)MoreExecutors.newDirectExecutorService());
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(localFragmentInstanceId, "exchange_0", queue, mockSourceHandleListener);
        Assert.assertFalse((boolean)localSourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)localSourceHandle.isAborted());
        Assert.assertFalse((boolean)localSourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)localSourceHandle.getBufferRetainedSizeInBytes());
        queue.add(Utils.createMockTsBlock(0x100000L));
        queue.setNoMoreTsBlocks(true);
        Assert.assertTrue((boolean)localSourceHandle.isBlocked().isDone());
        Assert.assertFalse((boolean)localSourceHandle.isAborted());
        Assert.assertFalse((boolean)localSourceHandle.isFinished());
        Assert.assertEquals((long)0x100000L, (long)localSourceHandle.getBufferRetainedSizeInBytes());
        Assert.assertTrue((boolean)localSourceHandle.isBlocked().isDone());
        localSourceHandle.receive();
        ListenableFuture blocked = localSourceHandle.isBlocked();
        Assert.assertTrue((boolean)blocked.isDone());
        Assert.assertFalse((boolean)localSourceHandle.isAborted());
        Assert.assertTrue((boolean)localSourceHandle.isFinished());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.times((int)1))).onFinished((ISourceHandle)localSourceHandle);
    }

    @Test
    public void testAbort() {
        String queryId = "q0";
        long mockTsBlockSize = 0x100000L;
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        String localPlanNodeId = "exchange_0";
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)mockMemoryPool);
        MPPDataExchangeManager.SourceHandleListener mockSourceHandleListener = (MPPDataExchangeManager.SourceHandleListener)Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        SharedTsBlockQueue queue = new SharedTsBlockQueue(localFragmentInstanceId, "exchange_0", mockLocalMemoryManager, (ExecutorService)MoreExecutors.newDirectExecutorService());
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(localFragmentInstanceId, "exchange_0", queue, mockSourceHandleListener);
        ListenableFuture future = localSourceHandle.isBlocked();
        Assert.assertFalse((boolean)future.isDone());
        Assert.assertFalse((boolean)localSourceHandle.isAborted());
        Assert.assertFalse((boolean)localSourceHandle.isFinished());
        Assert.assertEquals((long)0L, (long)localSourceHandle.getBufferRetainedSizeInBytes());
        localSourceHandle.abort();
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)localSourceHandle.isAborted());
        Assert.assertFalse((boolean)localSourceHandle.isFinished());
        ((MPPDataExchangeManager.SourceHandleListener)Mockito.verify((Object)mockSourceHandleListener, (VerificationMode)Mockito.times((int)1))).onAborted((ISourceHandle)localSourceHandle);
    }
}

