/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.operator;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest;
import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SeriesAggregationScanOperatorTest {
    private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.SeriesScanOperatorTest";
    private final List<String> deviceIds = new ArrayList<String>();
    private final List<MeasurementSchema> measurementSchemas = new ArrayList<MeasurementSchema>();
    private final List<TsFileResource> seqResources = new ArrayList<TsFileResource>();
    private final List<TsFileResource> unSeqResources = new ArrayList<TsFileResource>();
    private ExecutorService instanceNotificationExecutor;

    @Before
    public void setUp() throws MetadataException, IOException, WriteProcessException {
        SeriesReaderTestUtil.setUp(this.measurementSchemas, this.deviceIds, this.seqResources, this.unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
        this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool((int)1, (String)"test-instance-notification");
    }

    @After
    public void tearDown() throws IOException {
        SeriesReaderTestUtil.tearDown(this.seqResources, this.unSeqResources);
        this.instanceNotificationExecutor.shutdown();
    }

    @Test
    public void testAggregationWithoutTimeFilter() throws IllegalPathException {
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)500L, (long)resultTsBlock.getColumn(0).getLong(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, false, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)500L, (long)resultTsBlock.getColumn(0).getLong(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.COUNT);
        aggregationTypes.add(AggregationType.SUM);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)500L, (long)resultTsBlock.getColumn(0).getLong(0));
            Assert.assertEquals((double)6524750.0, (double)resultTsBlock.getColumn(1).getDouble(0), (double)1.0E-4);
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MIN_TIME);
        aggregationTypes.add(AggregationType.MAX_TIME);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)20000L, (long)resultTsBlock.getColumn(0).getInt(0));
            Assert.assertEquals((long)10499L, (long)resultTsBlock.getColumn(1).getInt(0));
            Assert.assertEquals((long)0L, (long)resultTsBlock.getColumn(2).getLong(0));
            Assert.assertEquals((long)499L, (long)resultTsBlock.getColumn(3).getLong(0));
            Assert.assertEquals((long)20199L, (long)resultTsBlock.getColumn(4).getInt(0));
            Assert.assertEquals((long)260L, (long)resultTsBlock.getColumn(5).getInt(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MIN_TIME);
        aggregationTypes.add(AggregationType.MAX_TIME);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)false).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, false, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)20000L, (long)resultTsBlock.getColumn(0).getInt(0));
            Assert.assertEquals((long)10499L, (long)resultTsBlock.getColumn(1).getInt(0));
            Assert.assertEquals((long)0L, (long)resultTsBlock.getColumn(2).getLong(0));
            Assert.assertEquals((long)499L, (long)resultTsBlock.getColumn(3).getLong(0));
            Assert.assertEquals((long)20199L, (long)resultTsBlock.getColumn(4).getInt(0));
            Assert.assertEquals((long)260L, (long)resultTsBlock.getColumn(5).getInt(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithTimeFilter1() throws IllegalPathException {
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        TimeFilter.TimeGtEq timeFilter = TimeFilter.gtEq((long)120L);
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)resultTsBlock.getColumn(0).getLong(0), (long)380L);
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithTimeFilter2() throws IllegalPathException {
        TimeFilter.TimeLtEq timeFilter = TimeFilter.ltEq((long)379L);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)resultTsBlock.getColumn(0).getLong(0), (long)380L);
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testAggregationWithTimeFilter3() throws IllegalPathException {
        AndFilter timeFilter = new AndFilter((Filter)TimeFilter.gtEq((long)100L), (Filter)TimeFilter.ltEq((long)399L));
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)resultTsBlock.getColumn(0).getLong(0), (long)300L);
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MIN_TIME);
        aggregationTypes.add(AggregationType.MAX_TIME);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        AndFilter timeFilter = new AndFilter((Filter)TimeFilter.gtEq((long)100L), (Filter)TimeFilter.ltEq((long)399L));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, null);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            Assert.assertEquals((long)20100L, (long)resultTsBlock.getColumn(0).getInt(0));
            Assert.assertEquals((long)399L, (long)resultTsBlock.getColumn(1).getInt(0));
            Assert.assertEquals((long)100L, (long)resultTsBlock.getColumn(2).getLong(0));
            Assert.assertEquals((long)399L, (long)resultTsBlock.getColumn(3).getLong(0));
            Assert.assertEquals((long)20199L, (long)resultTsBlock.getColumn(4).getInt(0));
            Assert.assertEquals((long)260L, (long)resultTsBlock.getColumn(5).getInt(0));
            ++count;
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
        int[] result = new int[]{100, 100, 100, 99};
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(0).getLong(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
        int[] result = new int[]{0, 80, 100, 80};
        AndFilter timeFilter = new AndFilter((Filter)TimeFilter.gtEq((long)120L), (Filter)TimeFilter.ltEq((long)379L));
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, (Filter)timeFilter, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(0).getLong(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupByWithMultiFunction() throws IllegalPathException {
        int[][] result = new int[][]{{20000, 20100, 10200, 10300}, {20099, 20199, 299, 398}, {20099, 20199, 10259, 10379}, {20000, 20100, 260, 380}};
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[0][count], (long)resultTsBlock.getColumn(0).getInt(pos));
                Assert.assertEquals((long)result[1][count], (long)resultTsBlock.getColumn(1).getInt(pos));
                Assert.assertEquals((long)result[2][count], (long)resultTsBlock.getColumn(2).getInt(pos));
                Assert.assertEquals((long)result[3][count], (long)resultTsBlock.getColumn(3).getInt(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
        int[][] result = new int[][]{{20000, 20100, 10200, 10300}, {20099, 20199, 299, 398}, {20099, 20199, 10259, 10379}, {20000, 20100, 260, 380}};
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 100L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)false).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, false, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(100 * (3 - count)), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[0][3 - count], (long)resultTsBlock.getColumn(0).getInt(pos));
                Assert.assertEquals((long)result[1][3 - count], (long)resultTsBlock.getColumn(1).getInt(pos));
                Assert.assertEquals((long)result[2][3 - count], (long)resultTsBlock.getColumn(2).getInt(pos));
                Assert.assertEquals((long)result[3][3 - count], (long)resultTsBlock.getColumn(3).getInt(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)4L, (long)count);
    }

    @Test
    public void testGroupBySlidingTimeWindow() throws IllegalPathException {
        int[] result = new int[]{50, 50, 50, 50, 50, 50, 50, 49};
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 399L, 100L, 50L, true);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)(50 * count), (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(0).getLong(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)result.length, (long)count);
    }

    @Test
    public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
        int[] timeColumn = new int[]{0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
        int[] result = new int[]{20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 149L, 50L, 30L, true);
        List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)timeColumn[count], (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[count], (long)resultTsBlock.getColumn(0).getLong(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)timeColumn.length, (long)count);
    }

    @Test
    public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
        int[] timeColumn = new int[]{0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
        int[][] result = new int[][]{{20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140}, {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148}, {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148}, {20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140}};
        ArrayList<AggregationType> aggregationTypes = new ArrayList<AggregationType>();
        aggregationTypes.add(AggregationType.FIRST_VALUE);
        aggregationTypes.add(AggregationType.LAST_VALUE);
        aggregationTypes.add(AggregationType.MAX_VALUE);
        aggregationTypes.add(AggregationType.MIN_VALUE);
        GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0L, 149L, 50L, 30L, true);
        ArrayList<Aggregator> aggregators = new ArrayList<Aggregator>();
        AccumulatorFactory.createAccumulators(aggregationTypes, (TSDataType)TSDataType.INT32, (boolean)true).forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
        SeriesAggregationScanOperator seriesAggregationScanOperator = this.initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
        int count = 0;
        while (seriesAggregationScanOperator.hasNext()) {
            TsBlock resultTsBlock = seriesAggregationScanOperator.next();
            int positionCount = resultTsBlock.getPositionCount();
            for (int pos = 0; pos < positionCount; ++pos) {
                Assert.assertEquals((long)timeColumn[count], (long)resultTsBlock.getTimeColumn().getLong(pos));
                Assert.assertEquals((long)result[0][count], (long)resultTsBlock.getColumn(0).getInt(pos));
                Assert.assertEquals((long)result[1][count], (long)resultTsBlock.getColumn(1).getInt(pos));
                Assert.assertEquals((long)result[2][count], (long)resultTsBlock.getColumn(2).getInt(pos));
                Assert.assertEquals((long)result[3][count], (long)resultTsBlock.getColumn(3).getInt(pos));
                ++count;
            }
        }
        Assert.assertEquals((long)timeColumn.length, (long)count);
    }

    public SeriesAggregationScanOperator initSeriesAggregationScanOperator(List<Aggregator> aggregators, Filter timeFilter, boolean ascending, GroupByTimeParameter groupByTimeParameter) throws IllegalPathException {
        MeasurementPath measurementPath = new MeasurementPath("root.SeriesScanOperatorTest.device0.sensor0", TSDataType.INT32);
        HashSet allSensors = Sets.newHashSet((Object[])new String[]{"sensor0"});
        QueryId queryId = new QueryId("stub_query");
        FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
        FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, (Executor)this.instanceNotificationExecutor);
        FragmentInstanceContext fragmentInstanceContext = FragmentInstanceContext.createFragmentInstanceContext((FragmentInstanceId)instanceId, (FragmentInstanceStateMachine)stateMachine);
        PlanNodeId planNodeId = new PlanNodeId("1");
        fragmentInstanceContext.addOperatorContext(1, planNodeId, SeriesAggregationScanOperator.class.getSimpleName());
        fragmentInstanceContext.getOperatorContexts().forEach(operatorContext -> operatorContext.setMaxRunTime(AggregationOperatorTest.TEST_TIME_SLICE));
        SeriesAggregationScanOperator seriesAggregationScanOperator = new SeriesAggregationScanOperator(planNodeId, (PartialPath)measurementPath, (Set)allSensors, (OperatorContext)fragmentInstanceContext.getOperatorContexts().get(0), aggregators, AggregationUtil.initTimeRangeIterator((GroupByTimeParameter)groupByTimeParameter, (boolean)ascending, (boolean)true), timeFilter, ascending, groupByTimeParameter, (long)TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
        seriesAggregationScanOperator.initQueryDataSource(new QueryDataSource(this.seqResources, this.unSeqResources));
        return seriesAggregationScanOperator;
    }
}

