/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.exchange;

import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.TsBlockSerdeFactory;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class MPPDataExchangeManagerTest {
    @Test
    public void testCreateLocalSinkHandle() {
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        String localPlanNodeId = "shuffleSink_0";
        FragmentInstanceContext mockFragmentInstanceContext = (FragmentInstanceContext)Mockito.mock(FragmentInstanceContext.class);
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 10240L, 5120L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager mppDataExchangeManager = new MPPDataExchangeManager(mockLocalMemoryManager, (Supplier)new TsBlockSerdeFactory(), Executors.newSingleThreadExecutor(), new IClientManager.Factory().createClientManager((IClientPoolFactory)new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
        ISinkHandle shuffleSinkHandle = mppDataExchangeManager.createShuffleSinkHandle(Collections.singletonList(new DownStreamChannelLocation(new TEndPoint(IoTDBDescriptor.getInstance().getConfig().getInternalAddress(), IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort()), remoteFragmentInstanceId, "exchange_0")), new DownStreamChannelIndex(0), ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN, localFragmentInstanceId, "shuffleSink_0", mockFragmentInstanceContext);
        Assert.assertTrue((boolean)(shuffleSinkHandle instanceof ShuffleSinkHandle));
        ISourceHandle localSourceHandle = mppDataExchangeManager.createLocalSourceHandleForFragment(remoteFragmentInstanceId, "exchange_0", "shuffleSink_0", localFragmentInstanceId, 0, t -> {});
        Assert.assertTrue((boolean)(localSourceHandle instanceof LocalSourceHandle));
        Assert.assertEquals((Object)((LocalSinkChannel)shuffleSinkHandle.getChannel(0)).getSharedTsBlockQueue(), (Object)((LocalSourceHandle)localSourceHandle).getSharedTsBlockQueue());
    }

    @Test
    public void testCreateLocalSourceHandle() {
        TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        String remotePlanNodeId = "exchange_0";
        String localPlanNodeId = "shuffleSink_0";
        FragmentInstanceContext mockFragmentInstanceContext = (FragmentInstanceContext)Mockito.mock(FragmentInstanceContext.class);
        LocalMemoryManager mockLocalMemoryManager = (LocalMemoryManager)Mockito.mock(LocalMemoryManager.class);
        MemoryPool spyMemoryPool = (MemoryPool)Mockito.spy((Object)new MemoryPool("test", 10240L, 5120L));
        Mockito.when((Object)mockLocalMemoryManager.getQueryPool()).thenReturn((Object)spyMemoryPool);
        MPPDataExchangeManager mppDataExchangeManager = new MPPDataExchangeManager(mockLocalMemoryManager, (Supplier)new TsBlockSerdeFactory(), Executors.newSingleThreadExecutor(), new IClientManager.Factory().createClientManager((IClientPoolFactory)new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
        ISourceHandle localSourceHandle = mppDataExchangeManager.createLocalSourceHandleForFragment(remoteFragmentInstanceId, "exchange_0", "shuffleSink_0", localFragmentInstanceId, 0, t -> {});
        Assert.assertTrue((boolean)(localSourceHandle instanceof LocalSourceHandle));
        ISinkHandle shuffleSinkHandle = mppDataExchangeManager.createShuffleSinkHandle(Collections.singletonList(new DownStreamChannelLocation(new TEndPoint(IoTDBDescriptor.getInstance().getConfig().getInternalAddress(), IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort()), remoteFragmentInstanceId, "exchange_0")), new DownStreamChannelIndex(0), ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN, localFragmentInstanceId, "shuffleSink_0", mockFragmentInstanceContext);
        Assert.assertTrue((boolean)(shuffleSinkHandle instanceof ShuffleSinkHandle));
        Assert.assertEquals((Object)((LocalSinkChannel)shuffleSinkHandle.getChannel(0)).getSharedTsBlockQueue(), (Object)((LocalSourceHandle)localSourceHandle).getSharedTsBlockQueue());
    }
}

