/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.operator;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest;
import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
import org.apache.iotdb.db.mpp.execution.operator.AlignedSeriesTestUtil;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AlignedSeriesAggregationScanOperatorTest {
    private static final String SERIES_AGGREGATION_SCAN_OPERATOR_TEST_SG = "root.AlignedSeriesAggregationScanOperatorTest";
    private static final List<MeasurementSchema> measurementSchemas = new ArrayList<MeasurementSchema>();
    private static final List<TsFileResource> seqResources = new ArrayList<TsFileResource>();
    private static final List<TsFileResource> unSeqResources = new ArrayList<TsFileResource>();
    private ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool((int)1, (String)"test-instance-notification");
    private static final double DELTA = 1.0E-6;

    @BeforeClass
    public static void setUp() throws MetadataException, IOException, WriteProcessException {
        AlignedSeriesTestUtil.setUp(measurementSchemas, seqResources, unSeqResources, SERIES_AGGREGATION_SCAN_OPERATOR_TEST_SG);
    }

    @AfterClass
    public static void tearDown() throws IOException {
        AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
    }

    @Test
    public void testAggregationWithoutTimeFilter() throws IllegalPathException {
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            for (int i = 0; i < measurementSchemas.size(); ++i) {
                Assert.assertEquals((long)500L, (long)resultTsBlock.getColumn(i).getLong(0));
            }
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)false), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, false, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            for (int i = 0; i < measurementSchemas.size(); ++i) {
                Assert.assertEquals((long)500L, (long)resultTsBlock.getColumn(i).getLong(0));
            }
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.COUNT);
        aggregationTypes.add(AggregationType.SUM);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < 2; ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)((AggregationType)aggregationTypes.get(i)), (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)500L, (long)resultTsBlock.getColumn(0).getLong(0));
            Assert.assertEquals((double)6524750.0, (double)resultTsBlock.getColumn(1).getDouble(0), (double)1.0E-4);
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        aggregationTypes.add(AggregationType.MIN_TIME);
        aggregationTypes.add(AggregationType.MAX_TIME);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < 6; ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)((AggregationType)aggregationTypes.get(i)), (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertTrue((boolean)resultTsBlock.getColumn(0).getBoolean(0));
            Assert.assertEquals((long)10499L, (long)resultTsBlock.getColumn(1).getInt(0));
            Assert.assertEquals((long)20199L, (long)resultTsBlock.getColumn(2).getLong(0));
            Assert.assertEquals((double)260.0, (double)resultTsBlock.getColumn(3).getFloat(0), (double)1.0E-6);
            Assert.assertEquals((long)0L, (long)resultTsBlock.getColumn(4).getLong(0));
            Assert.assertEquals((long)499L, (long)resultTsBlock.getColumn(5).getLong(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        aggregationTypes.add(AggregationType.MIN_TIME);
        aggregationTypes.add(AggregationType.MAX_TIME);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < 6; ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)((AggregationType)aggregationTypes.get(i)), (TSDataType)dataType, (boolean)false), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, false, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertTrue((boolean)resultTsBlock.getColumn(0).getBoolean(0));
            Assert.assertEquals((long)10499L, (long)resultTsBlock.getColumn(1).getInt(0));
            Assert.assertEquals((long)20199L, (long)resultTsBlock.getColumn(2).getLong(0));
            Assert.assertEquals((double)260.0, (double)resultTsBlock.getColumn(3).getFloat(0), (double)1.0E-6);
            Assert.assertEquals((long)0L, (long)resultTsBlock.getColumn(4).getLong(0));
            Assert.assertEquals((long)499L, (long)resultTsBlock.getColumn(5).getLong(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithTimeFilter1() throws IllegalPathException {
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        TimeFilter.TimeGtEq timeFilter = TimeFilter.gtEq((long)120L);
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            for (int i = 0; i < measurementSchemas.size(); ++i) {
                Assert.assertEquals((long)resultTsBlock.getColumn(i).getLong(0), (long)380L);
            }
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithTimeFilter2() throws IllegalPathException {
        TimeFilter.TimeLtEq timeFilter = TimeFilter.ltEq((long)379L);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            for (int i = 0; i < measurementSchemas.size(); ++i) {
                Assert.assertEquals((long)resultTsBlock.getColumn(i).getLong(0), (long)380L);
            }
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithTimeFilter3() throws IllegalPathException {
        AndFilter timeFilter = new AndFilter((Filter)TimeFilter.gtEq((long)100L), (Filter)TimeFilter.ltEq((long)399L));
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            for (int i = 0; i < measurementSchemas.size(); ++i) {
                Assert.assertEquals((long)resultTsBlock.getColumn(i).getLong(0), (long)300L);
            }
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        aggregationTypes.add(AggregationType.MIN_TIME);
        aggregationTypes.add(AggregationType.MAX_TIME);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < 6; ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)((AggregationType)aggregationTypes.get(i)), (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AndFilter timeFilter = new AndFilter((Filter)TimeFilter.gtEq((long)100L), (Filter)TimeFilter.ltEq((long)399L));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertTrue((boolean)resultTsBlock.getColumn(0).getBoolean(0));
            Assert.assertEquals((long)399L, (long)resultTsBlock.getColumn(1).getInt(0));
            Assert.assertEquals((long)20199L, (long)resultTsBlock.getColumn(2).getLong(0));
            Assert.assertEquals((double)260.0, (double)resultTsBlock.getColumn(3).getFloat(0), (double)1.0E-6);
            Assert.assertEquals((long)100L, (long)resultTsBlock.getColumn(4).getLong(0));
            Assert.assertEquals((long)399L, (long)resultTsBlock.getColumn(5).getLong(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
        int[] result = new int[]{100, 100, 100, 99};
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                for (int i = 0; i < measurementSchemas.size(); ++i) {
                    Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(i).getLong(pos));
                }
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
        int[] result = new int[]{0, 80, 100, 80};
        AndFilter timeFilter = new AndFilter((Filter)TimeFilter.gtEq((long)120L), (Filter)TimeFilter.ltEq((long)379L));
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        for (int i = 0; i < measurementSchemas.size(); ++i) {
            TSDataType dataType = measurementSchemas.get(i).getType();
            ArrayList<InputLocation[]> inputLocations = new ArrayList<InputLocation[]>();
            inputLocations.add(new InputLocation[]{new InputLocation(0, i)});
            aggregators.add(new Aggregator(AccumulatorFactory.createAccumulator((AggregationType)AggregationType.COUNT, (TSDataType)dataType, (boolean)true), AggregationStep.SINGLE, inputLocations));
        }
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                for (int i = 0; i < measurementSchemas.size(); ++i) {
                    Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(i).getLong(pos));
                }
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupByWithMultiFunction() throws IllegalPathException {
        int[][] result = new int[][]{{20000, 20100, 10200, 10300}, {20099, 20199, 299, 398}, {20099, 20199, 10259, 10379}, {20000, 20100, 260, 380}};
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        List<InputLocation[]> inputLocations = Collections.singletonList(new InputLocation[]{new InputLocation(0, 1)});
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE, inputLocations)));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[0][count], (long)resultTsBlock.getColumn(0).getInt(pos));
                Assert.assertEquals((long)result[1][count], (long)resultTsBlock.getColumn(1).getInt(pos));
                Assert.assertEquals((long)result[2][count], (long)resultTsBlock.getColumn(2).getInt(pos));
                Assert.assertEquals((long)result[3][count], (long)resultTsBlock.getColumn(3).getInt(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
        int[][] result = new int[][]{{20000, 20100, 10200, 10300}, {20099, 20199, 299, 398}, {20099, 20199, 10259, 10379}, {20000, 20100, 260, 380}};
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        List<InputLocation[]> inputLocations = Collections.singletonList(new InputLocation[]{new InputLocation(0, 1)});
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)false).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE, inputLocations)));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, false, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * (3 - count)), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[0][3 - count], (long)resultTsBlock.getColumn(0).getInt(pos));
                Assert.assertEquals((long)result[1][3 - count], (long)resultTsBlock.getColumn(1).getInt(pos));
                Assert.assertEquals((long)result[2][3 - count], (long)resultTsBlock.getColumn(2).getInt(pos));
                Assert.assertEquals((long)result[3][3 - count], (long)resultTsBlock.getColumn(3).getInt(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupBySlidingTimeWindow() throws IllegalPathException {
        int[] result = new int[]{50, 50, 50, 50, 50, 50, 50, 49};
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 50L, true);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        List<InputLocation[]> inputLocations = Collections.singletonList(new InputLocation[]{new InputLocation(0, 1)});
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE, inputLocations)));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(50 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(0).getLong(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)result.length, (long)count);
    }

    @Test
    public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
        int[] timeColumn = new int[]{0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
        int[] result = new int[]{20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 149L, 50L, 30L, true);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        List<InputLocation[]> inputLocations = Collections.singletonList(new InputLocation[]{new InputLocation(0, 1)});
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE, inputLocations)));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)timeColumn[count], (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(0).getLong(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)timeColumn.length, (long)count);
    }

    @Test
    public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
        int[] timeColumn = new int[]{0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
        int[][] result = new int[][]{{20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140}, {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148}, {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148}, {20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140}};
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 149L, 50L, 30L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        List<InputLocation[]> inputLocations = Collections.singletonList(new InputLocation[]{new InputLocation(0, 1)});
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE, inputLocations)));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = this.initAlignedSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)timeColumn[count], (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[0][count], (long)resultTsBlock.getColumn(0).getInt(pos));
                Assert.assertEquals((long)result[1][count], (long)resultTsBlock.getColumn(1).getInt(pos));
                Assert.assertEquals((long)result[2][count], (long)resultTsBlock.getColumn(2).getInt(pos));
                Assert.assertEquals((long)result[3][count], (long)resultTsBlock.getColumn(3).getInt(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)timeColumn.length, (long)count);
    }

    public AlignedSeriesAggregationScanOperator initAlignedSeriesAggregationScanOperator(List<Aggregator> aggregators, Filter timeFilter, boolean ascending, GroupByTimeParameter groupByTimeParameter) throws IllegalPathException {
        AlignedPath alignedPath = new AlignedPath("root.AlignedSeriesAggregationScanOperatorTest.device0", measurementSchemas.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList()), measurementSchemas.stream().map(m -> m).collect(Collectors.toList()));
        HashSet allSensors = Sets.newHashSet((Object[])new String[]{"sensor0"});
        QueryId queryId = new QueryId("stub_query");
        FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
        FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, (Executor)this.instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext = FragmentInstanceContext.createFragmentInstanceContext((FragmentInstanceId)instanceId, (FragmentInstanceStateMachine)stateMachine);
        PlanNodeId planNodeId = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName());
        fragmentInstanceContext.getOperatorContexts().forEach(operatorContext -> operatorContext.setMaxRunTime(AggregationOperatorTest.TEST_TIME_SLICE));
        AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = new AlignedSeriesAggregationScanOperator(planNodeId, alignedPath, (OperatorContext)fragmentInstanceContext.getOperatorContexts().get(0), aggregators, AggregationUtil.initTimeRangeIterator((GroupByTimeParameter)groupByTimeParameter, (boolean)ascending, (boolean)true), timeFilter, ascending, groupByTimeParameter, (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
        seriesAggregationScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
        return seriesAggregationScanOperator;
    }
}

