/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.StubSinkHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class DataDriverTest {
    private static final String DATA_DRIVER_TEST_SG = "root.DataDriverTest";
    private final List<String> deviceIds = new ArrayList<String>();
    private final List<MeasurementSchema> measurementSchemas = new ArrayList<MeasurementSchema>();
    private final List<TsFileResource> seqResources = new ArrayList<TsFileResource>();
    private final List<TsFileResource> unSeqResources = new ArrayList<TsFileResource>();

    @Before
    public void setUp() throws MetadataException, IOException, WriteProcessException {
        SeriesReaderTestUtil.setUp(this.measurementSchemas, this.deviceIds, this.seqResources, this.unSeqResources, DATA_DRIVER_TEST_SG);
    }

    @After
    public void tearDown() throws IOException {
        SeriesReaderTestUtil.tearDown(this.seqResources, this.unSeqResources);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void batchTest() {
        ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool((int)1, (String)"test-instance-notification");
        try {
            MeasurementPath measurementPath1 = new MeasurementPath("root.DataDriverTest.device0.sensor0", TSDataType.INT32);
            HashSet<String> allSensors = new HashSet<String>();
            allSensors.add("sensor0");
            allSensors.add("sensor1");
            QueryId queryId = new QueryId("stub_query");
            FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
            FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, (Executor)instanceNotificationExecutor);
            FragmentInstanceContext fragmentInstanceContext = FragmentInstanceContext.createFragmentInstanceContext((FragmentInstanceId)instanceId, (FragmentInstanceStateMachine)stateMachine);
            PlanNodeId planNodeId1 = new PlanNodeId("1");
            fragmentInstanceContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
            PlanNodeId planNodeId2 = new PlanNodeId("2");
            fragmentInstanceContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
            fragmentInstanceContext.addOperatorContext(3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
            fragmentInstanceContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());
            SeriesScanOperator seriesScanOperator1 = new SeriesScanOperator(planNodeId1, (PartialPath)measurementPath1, allSensors, TSDataType.INT32, (OperatorContext)fragmentInstanceContext.getOperatorContexts().get(0), null, null, true);
            MeasurementPath measurementPath2 = new MeasurementPath("root.DataDriverTest.device0.sensor1", TSDataType.INT32);
            SeriesScanOperator seriesScanOperator2 = new SeriesScanOperator(planNodeId2, (PartialPath)measurementPath2, allSensors, TSDataType.INT32, (OperatorContext)fragmentInstanceContext.getOperatorContexts().get(1), null, null, true);
            TimeJoinOperator timeJoinOperator = new TimeJoinOperator((OperatorContext)fragmentInstanceContext.getOperatorContexts().get(2), Arrays.asList(seriesScanOperator1, seriesScanOperator2), Ordering.ASC, Arrays.asList(TSDataType.INT32, TSDataType.INT32), Arrays.asList(new SingleColumnMerger(new InputLocation(0, 0), (TimeComparator)new AscTimeComparator()), new SingleColumnMerger(new InputLocation(1, 0), (TimeComparator)new AscTimeComparator())), (TimeComparator)new AscTimeComparator());
            LimitOperator limitOperator = new LimitOperator((OperatorContext)fragmentInstanceContext.getOperatorContexts().get(3), 250L, (Operator)timeJoinOperator);
            DataRegion dataRegion = (DataRegion)Mockito.mock(DataRegion.class);
            ImmutableList pathList = ImmutableList.of((Object)measurementPath1, (Object)measurementPath2);
            String deviceId = "root.DataDriverTest.device0";
            Mockito.when((Object)dataRegion.query((List)pathList, deviceId, (QueryContext)fragmentInstanceContext, null)).thenReturn((Object)new QueryDataSource(this.seqResources, this.unSeqResources));
            DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, (List)pathList, null, dataRegion, (List)ImmutableList.of((Object)seriesScanOperator1, (Object)seriesScanOperator2));
            StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
            try (DataDriver dataDriver = null;){
                dataDriver = new DataDriver((Operator)limitOperator, (ISinkHandle)sinkHandle, driverContext);
                Assert.assertEquals((Object)fragmentInstanceContext.getId(), (Object)dataDriver.getInfo());
                Assert.assertFalse((boolean)dataDriver.isFinished());
                while (!dataDriver.isFinished()) {
                    Assert.assertEquals((Object)FragmentInstanceState.RUNNING, (Object)stateMachine.getState());
                    ListenableFuture blocked = dataDriver.processFor(DriverTaskThread.EXECUTION_TIME_SLICE);
                    Assert.assertTrue((boolean)blocked.isDone());
                }
                Assert.assertEquals((Object)FragmentInstanceState.FLUSHING, (Object)stateMachine.getState());
                List<TsBlock> result = sinkHandle.getTsBlocks();
                Assert.assertEquals((long)13L, (long)result.size());
                for (int i = 0; i < 13; ++i) {
                    TsBlock tsBlock = result.get(i);
                    Assert.assertEquals((long)2L, (long)tsBlock.getValueColumnCount());
                    Assert.assertTrue((boolean)(tsBlock.getColumn(0) instanceof IntColumn));
                    Assert.assertTrue((boolean)(tsBlock.getColumn(1) instanceof IntColumn));
                    if (i < 12) {
                        Assert.assertEquals((long)20L, (long)tsBlock.getPositionCount());
                    } else {
                        Assert.assertEquals((long)10L, (long)tsBlock.getPositionCount());
                    }
                    for (int j = 0; j < tsBlock.getPositionCount(); ++j) {
                        long expectedTime = (long)j + 20L * (long)i;
                        Assert.assertEquals((long)expectedTime, (long)tsBlock.getTimeByIndex(j));
                        if (expectedTime < 200L) {
                            Assert.assertEquals((long)(20000L + expectedTime), (long)tsBlock.getColumn(0).getInt(j));
                            Assert.assertEquals((long)(20000L + expectedTime), (long)tsBlock.getColumn(1).getInt(j));
                            continue;
                        }
                        if (expectedTime < 260L || expectedTime >= 300L && expectedTime < 380L || expectedTime >= 400L) {
                            Assert.assertEquals((long)(10000L + expectedTime), (long)tsBlock.getColumn(0).getInt(j));
                            Assert.assertEquals((long)(10000L + expectedTime), (long)tsBlock.getColumn(1).getInt(j));
                            continue;
                        }
                        Assert.assertEquals((long)expectedTime, (long)tsBlock.getColumn(0).getInt(j));
                        Assert.assertEquals((long)expectedTime, (long)tsBlock.getColumn(1).getInt(j));
                    }
                }
            }
        }
        catch (IllegalPathException | QueryProcessException e) {
            e.printStackTrace();
            Assert.fail();
        }
        finally {
            instanceNotificationExecutor.shutdown();
        }
    }
}

