/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.plan.planner.distribution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.distribution.Util;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
import org.junit.Test;

public class AggregationDistributionTest {
    @Test
    public void testAggregation1Series2Regions() {
        QueryId queryId = new QueryId("test_1_series_2_regions");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1) from root.sg.d1";
        String d1s1Path = "root.sg.d1.s1";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode rootNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        AggregationNode aggregationNode = (AggregationNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0);
        Assert.assertEquals((Object)AggregationStep.FINAL, (Object)((AggregationDescriptor)aggregationNode.getAggregationDescriptorList().get(0)).getStep());
    }

    @Test
    public void testAggregation1Series2RegionsWithSlidingWindow() {
        QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1) from root.sg.d1 group by ([0, 100), 5ms, 1ms)";
        String d1s1Path = "root.sg.d1.s1";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode rootNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        AggregationNode aggregationNode = (AggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0);
        Assert.assertEquals((Object)AggregationStep.INTERMEDIATE, (Object)((AggregationDescriptor)aggregationNode.getAggregationDescriptorList().get(0)).getStep());
    }

    @Test
    public void testTimeJoinAggregationSinglePerRegion() {
        QueryId queryId = new QueryId("test_query_time_join_aggregation");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1) from root.sg.d1, root.sg.d22";
        String d1s1Path = "root.sg.d1.s1";
        String d2s1Path = "root.sg.d22.s1";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode timeJoinNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        List fragmentInstances = plan.getInstances();
        List<AggregationStep> expected = Arrays.asList(AggregationStep.STATIC, AggregationStep.FINAL);
        this.verifyAggregationStep(expected, ((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d2s1Path, AggregationStep.SINGLE);
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
    }

    private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
        if (root == null) {
            return;
        }
        if (root instanceof SeriesAggregationSourceNode) {
            SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode)root;
            List descriptorList = handle.getAggregationDescriptorList();
            descriptorList.forEach(d -> Assert.assertEquals(expected.get(handle.getPartitionPath().getFullPath()), (Object)d.getStep()));
        }
        root.getChildren().forEach(child -> this.verifyAggregationStep(expected, (PlanNode)child));
    }

    private void verifyAggregationStep(List<AggregationStep> expected, PlanNode root) {
        if (root == null) {
            return;
        }
        if (root instanceof AggregationNode) {
            List actual = ((AggregationNode)root).getAggregationDescriptorList().stream().map(AggregationDescriptor::getStep).collect(Collectors.toList());
            Assert.assertEquals(expected, actual);
        }
        root.getChildren().forEach(child -> this.verifyAggregationStep(expected, (PlanNode)child));
    }

    @Test
    public void testTimeJoinAggregationWithSlidingWindow() {
        QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String d1s1Path = "root.sg.d1.s1";
        String d3s1Path = "root.sg.d333.s1";
        String sql = "select count(s1) from root.sg.d1,root.sg.d333 group by ([0, 50), 5ms, 3ms)";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode slidingWindowAggregationNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, slidingWindowAggregationNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        AggregationNode aggregationNode = (AggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0);
        aggregationNode.getAggregationDescriptorList().forEach(d -> Assert.assertEquals((Object)AggregationStep.INTERMEDIATE, (Object)d.getStep()));
    }

    @Test
    public void testTimeJoinAggregationMultiPerRegion() {
        QueryId queryId = new QueryId("test_query_time_join_aggregation");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String d1s1Path = "root.sg.d1.s1";
        String d3s1Path = "root.sg.d333.s1";
        String sql = "select count(s1) from root.sg.d1, root.sg.d333";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode timeJoinNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
    }

    @Test
    public void testTimeJoinAggregationMultiPerRegion2() {
        QueryId queryId = new QueryId("test_query_time_join_aggregation");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String d3s1Path = "root.sg.d333.s1";
        String d4s1Path = "root.sg.d4444.s1";
        String sql = "select count(s1) from root.sg.d333, root.sg.d4444";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode timeJoinNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
    }

    @Test
    public void testGroupByLevelWithTwoChildren() throws IllegalPathException {
        QueryId queryId = new QueryId("test_group_by_level_two_children");
        String d1s1Path = "root.sg.d1.s1";
        String d2s1Path = "root.sg.d22.s1";
        String groupedPath = "root.sg.*.s1";
        GroupByLevelNode groupByLevelNode = new GroupByLevelNode(new PlanNodeId("TestGroupByLevelNode"), Arrays.asList(this.genAggregationSourceNode(queryId, d1s1Path, TAggregationType.COUNT), this.genAggregationSourceNode(queryId, d2s1Path, TAggregationType.COUNT)), Collections.singletonList(new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Arrays.asList(new TimeSeriesOperand(new PartialPath(d1s1Path)), new TimeSeriesOperand(new PartialPath(d2s1Path))), 2, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPath))))), null, Ordering.ASC);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)groupByLevelNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
    }

    @Test
    public void testAggregationWithMultiGroupByLevelNode() throws IllegalPathException {
        QueryId queryId = new QueryId("test_group_by_level_two_children");
        String d3s1Path = "root.sg.d333.s1";
        String d4s1Path = "root.sg.d4444.s1";
        String groupedPath = "root.sg.*.s1";
        GroupByLevelNode groupByLevelNode = new GroupByLevelNode(new PlanNodeId("TestGroupByLevelNode"), Arrays.asList(this.genAggregationSourceNode(queryId, d3s1Path, TAggregationType.COUNT), this.genAggregationSourceNode(queryId, d4s1Path, TAggregationType.COUNT)), Collections.singletonList(new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Arrays.asList(new TimeSeriesOperand(new PartialPath(d3s1Path)), new TimeSeriesOperand(new PartialPath(d4s1Path))), 2, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPath))))), null, Ordering.ASC);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)groupByLevelNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        HashMap<String, List<String>> expectedDescriptorValue = new HashMap<String, List<String>>();
        expectedDescriptorValue.put(groupedPath, Collections.singletonList(groupedPath));
        Assert.assertTrue((boolean)(((PlanNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0) instanceof GroupByLevelNode));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0));
        HashMap<String, List<String>> expectedDescriptorValue2 = new HashMap<String, List<String>>();
        expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue2, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(1)).getFragment().getPlanNodeTree().getChildren().get(0));
    }

    @Test
    public void testGroupByLevelNodeWithSlidingWindow() throws IllegalPathException {
        QueryId queryId = new QueryId("test_group_by_level_with_sliding_window");
        String d3s1Path = "root.sg.d333.s1";
        String d4s1Path = "root.sg.d4444.s1";
        String groupedPath = "root.sg.*.s1";
        SlidingWindowAggregationNode slidingWindowAggregationNode = this.genSlidingWindowAggregationNode(queryId, Arrays.asList(new PartialPath(d3s1Path), new PartialPath(d4s1Path)), TAggregationType.COUNT, AggregationStep.PARTIAL, null);
        FullOuterTimeJoinNode fullOuterTimeJoinNode = new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC);
        fullOuterTimeJoinNode.addChild((PlanNode)this.genAggregationSourceNode(queryId, d3s1Path, TAggregationType.COUNT));
        fullOuterTimeJoinNode.addChild((PlanNode)this.genAggregationSourceNode(queryId, d4s1Path, TAggregationType.COUNT));
        slidingWindowAggregationNode.addChild((PlanNode)fullOuterTimeJoinNode);
        GroupByLevelNode groupByLevelNode = new GroupByLevelNode(new PlanNodeId("TestGroupByLevelNode"), Collections.singletonList(slidingWindowAggregationNode), Collections.singletonList(new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Arrays.asList(new TimeSeriesOperand(new PartialPath(d3s1Path)), new TimeSeriesOperand(new PartialPath(d4s1Path))), 2, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPath))))), null, Ordering.ASC);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)groupByLevelNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        List fragmentInstances = plan.getInstances();
        List<AggregationStep> expected = Arrays.asList(AggregationStep.FINAL, AggregationStep.FINAL);
        this.verifyAggregationStep(expected, ((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        HashMap<String, List<String>> expectedDescriptorValue = new HashMap<String, List<String>>();
        expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0));
        HashMap<String, List<String>> expectedDescriptorValue2 = new HashMap<String, List<String>>();
        expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue2, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(1)).getFragment().getPlanNodeTree().getChildren().get(0));
        this.verifySlidingWindowDescriptor(Arrays.asList(d3s1Path, d4s1Path), (SlidingWindowAggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0));
        this.verifySlidingWindowDescriptor(Arrays.asList(d3s1Path, d4s1Path), (SlidingWindowAggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(1)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0));
    }

    @Test
    public void testGroupByLevelTwoSeries() throws IllegalPathException {
        QueryId queryId = new QueryId("test_group_by_level_two_series");
        String d1s1Path = "root.sg.d1.s1";
        String d1s2Path = "root.sg.d1.s2";
        String groupedPathS1 = "root.sg.*.s1";
        String groupedPathS2 = "root.sg.*.s2";
        GroupByLevelNode groupByLevelNode = new GroupByLevelNode(new PlanNodeId("TestGroupByLevelNode"), Arrays.asList(this.genAggregationSourceNode(queryId, d1s1Path, TAggregationType.COUNT), this.genAggregationSourceNode(queryId, d1s2Path, TAggregationType.COUNT)), Arrays.asList(new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))), 1, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPathS1)))), new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))), 1, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPathS2))))), null, Ordering.ASC);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)groupByLevelNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        HashMap<String, List<String>> expectedDescriptorValue = new HashMap<String, List<String>>();
        expectedDescriptorValue.put(groupedPathS1, Collections.singletonList(groupedPathS1));
        expectedDescriptorValue.put(groupedPathS2, Collections.singletonList(groupedPathS2));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0));
        HashMap<String, List<String>> expectedDescriptorValue2 = new HashMap<String, List<String>>();
        expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
        expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue2, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(1)).getFragment().getPlanNodeTree().getChildren().get(0));
    }

    @Test
    public void testGroupByLevel2Series2Devices3Regions() throws IllegalPathException {
        QueryId queryId = new QueryId("test_group_by_level_two_series");
        String d1s1Path = "root.sg.d1.s1";
        String d1s2Path = "root.sg.d1.s2";
        String d2s1Path = "root.sg.d22.s1";
        String groupedPathS1 = "root.sg.*.s1";
        String groupedPathS2 = "root.sg.*.s2";
        GroupByLevelNode groupByLevelNode = new GroupByLevelNode(new PlanNodeId("TestGroupByLevelNode"), Arrays.asList(this.genAggregationSourceNode(queryId, d1s1Path, TAggregationType.COUNT), this.genAggregationSourceNode(queryId, d1s2Path, TAggregationType.COUNT), this.genAggregationSourceNode(queryId, d2s1Path, TAggregationType.COUNT)), Arrays.asList(new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Arrays.asList(new TimeSeriesOperand(new PartialPath(d1s1Path)), new TimeSeriesOperand(new PartialPath(d2s1Path))), 2, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPathS1)))), new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))), 1, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPathS2))))), null, Ordering.ASC);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)groupByLevelNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
        expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        Assert.assertTrue((boolean)(((PlanNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0) instanceof GroupByLevelNode));
        Assert.assertTrue((boolean)(((FragmentInstance)fragmentInstances.get(2)).getFragment().getPlanNodeTree().getChildren().get(0) instanceof GroupByLevelNode));
        HashMap<String, List<String>> expectedDescriptorValue = new HashMap<String, List<String>>();
        expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d2s1Path));
        expectedDescriptorValue.put(groupedPathS2, Collections.singletonList(groupedPathS2));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0));
        HashMap<String, List<String>> expectedDescriptorValue2 = new HashMap<String, List<String>>();
        expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
        expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue2, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(2)).getFragment().getPlanNodeTree().getChildren().get(0));
    }

    @Test
    public void testGroupByLevelWithSliding2Series2Devices3Regions() throws IllegalPathException {
        QueryId queryId = new QueryId("test_group_by_level_two_series");
        String d1s1Path = "root.sg.d1.s1";
        String d1s2Path = "root.sg.d1.s2";
        String d2s1Path = "root.sg.d22.s1";
        String groupedPathS1 = "root.sg.*.s1";
        String groupedPathS2 = "root.sg.*.s2";
        FullOuterTimeJoinNode fullOuterTimeJoinNode = new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC);
        fullOuterTimeJoinNode.addChild((PlanNode)this.genAggregationSourceNode(queryId, d1s1Path, TAggregationType.COUNT));
        fullOuterTimeJoinNode.addChild((PlanNode)this.genAggregationSourceNode(queryId, d1s2Path, TAggregationType.COUNT));
        fullOuterTimeJoinNode.addChild((PlanNode)this.genAggregationSourceNode(queryId, d2s1Path, TAggregationType.COUNT));
        SlidingWindowAggregationNode slidingWindowAggregationNode = this.genSlidingWindowAggregationNode(queryId, Arrays.asList(new PartialPath(d1s1Path), new PartialPath(d1s2Path), new PartialPath(d2s1Path)), TAggregationType.COUNT, AggregationStep.PARTIAL, null);
        slidingWindowAggregationNode.addChild((PlanNode)fullOuterTimeJoinNode);
        GroupByLevelNode groupByLevelNode = new GroupByLevelNode(new PlanNodeId("TestGroupByLevelNode"), Collections.singletonList(slidingWindowAggregationNode), Arrays.asList(new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Arrays.asList(new TimeSeriesOperand(new PartialPath(d1s1Path)), new TimeSeriesOperand(new PartialPath(d2s1Path))), 2, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPathS1)))), new CrossSeriesAggregationDescriptor(TAggregationType.COUNT.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))), 1, Collections.emptyMap(), Collections.singletonList(new TimeSeriesOperand(new PartialPath(groupedPathS2))))), null, Ordering.ASC);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)groupByLevelNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
        expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
        expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
        HashMap<String, List<String>> expectedDescriptorValue = new HashMap<String, List<String>>();
        expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
        expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0));
        HashMap<String, List<String>> expectedDescriptorValue2 = new HashMap<String, List<String>>();
        expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d2s1Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue2, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(1)).getFragment().getPlanNodeTree().getChildren().get(0));
        HashMap<String, List<String>> expectedDescriptorValue3 = new HashMap<String, List<String>>();
        expectedDescriptorValue3.put(groupedPathS1, Collections.singletonList(d1s1Path));
        expectedDescriptorValue3.put(groupedPathS2, Collections.singletonList(d1s2Path));
        this.verifyGroupByLevelDescriptor(expectedDescriptorValue3, (GroupByLevelNode)((FragmentInstance)fragmentInstances.get(2)).getFragment().getPlanNodeTree().getChildren().get(0));
        this.verifySlidingWindowDescriptor(Arrays.asList(d1s1Path, d1s2Path), (SlidingWindowAggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(0)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0));
        this.verifySlidingWindowDescriptor(Collections.singletonList(d2s1Path), (SlidingWindowAggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(1)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0));
        this.verifySlidingWindowDescriptor(Arrays.asList(d1s1Path, d1s2Path), (SlidingWindowAggregationNode)((PlanNode)((FragmentInstance)fragmentInstances.get(2)).getFragment().getPlanNodeTree().getChildren().get(0)).getChildren().get(0));
    }

    @Test
    public void testAggregation1Series1Region() throws IllegalPathException {
        QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
        String d2s1Path = "root.sg.d22.s1";
        SeriesAggregationSourceNode root = this.genAggregationSourceNode(queryId, d2s1Path, TAggregationType.COUNT);
        Analysis analysis = Util.constructAnalysis();
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, (PlanNode)root));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)1L, (long)plan.getInstances().size());
        Assert.assertEquals((Object)root, ((FragmentInstance)plan.getInstances().get(0)).getFragment().getPlanNodeTree().getChildren().get(0));
    }

    @Test
    public void testAlignByDevice1Device2Region() {
        QueryId queryId = new QueryId("test_align_by_device_1_device_2_region");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1), count(s2) from root.sg.d1 align by device";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        PlanNode f1Root = (PlanNode)((FragmentInstance)plan.getInstances().get(0)).getFragment().getPlanNodeTree().getChildren().get(0);
        PlanNode f2Root = (PlanNode)((FragmentInstance)plan.getInstances().get(1)).getFragment().getPlanNodeTree().getChildren().get(0);
        Assert.assertTrue((boolean)(f1Root instanceof AggregationMergeSortNode));
        Assert.assertTrue((boolean)(f2Root instanceof DeviceViewNode));
        Assert.assertTrue((boolean)(f1Root.getChildren().get(0) instanceof DeviceViewNode));
        Assert.assertEquals((long)1L, (long)((PlanNode)f1Root.getChildren().get(0)).getChildren().size());
    }

    @Test
    public void testAlignByDevice2Device3Region() {
        QueryId queryId = new QueryId("test_align_by_device_2_device_3_region");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1), count(s2) from root.sg.d1,root.sg.d22 align by device";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
        PlanNode f1Root = (PlanNode)((FragmentInstance)plan.getInstances().get(0)).getFragment().getPlanNodeTree().getChildren().get(0);
        PlanNode f2Root = (PlanNode)((FragmentInstance)plan.getInstances().get(1)).getFragment().getPlanNodeTree().getChildren().get(0);
        PlanNode f3Root = (PlanNode)((FragmentInstance)plan.getInstances().get(2)).getFragment().getPlanNodeTree().getChildren().get(0);
        Assert.assertTrue((boolean)(f1Root instanceof AggregationMergeSortNode));
        Assert.assertTrue((boolean)(f2Root instanceof DeviceViewNode));
        Assert.assertTrue((boolean)(f3Root instanceof DeviceViewNode));
        Assert.assertTrue((boolean)(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode));
        Assert.assertTrue((boolean)(f1Root.getChildren().get(0) instanceof DeviceViewNode));
        Assert.assertTrue((boolean)(f1Root.getChildren().get(1) instanceof ExchangeNode));
        Assert.assertEquals((long)1L, (long)((PlanNode)f1Root.getChildren().get(0)).getChildren().size());
    }

    @Test
    public void testAlignByDevice2Device2Region() {
        QueryId queryId = new QueryId("test_align_by_device_2_device_2_region");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1), count(s2) from root.sg.d333,root.sg.d4444 align by device";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        PlanNode f1Root = (PlanNode)((FragmentInstance)plan.getInstances().get(0)).getFragment().getPlanNodeTree().getChildren().get(0);
        PlanNode f2Root = (PlanNode)((FragmentInstance)plan.getInstances().get(1)).getFragment().getPlanNodeTree().getChildren().get(0);
        Assert.assertTrue((boolean)(f1Root instanceof AggregationMergeSortNode));
        Assert.assertTrue((boolean)(f2Root instanceof DeviceViewNode));
        Assert.assertEquals((long)2L, (long)f1Root.getChildren().size());
    }

    private void verifyGroupByLevelDescriptor(Map<String, List<String>> expected, GroupByLevelNode node) {
        List descriptors = node.getGroupByLevelDescriptors();
        Assert.assertEquals((long)expected.size(), (long)descriptors.size());
        for (CrossSeriesAggregationDescriptor descriptor : descriptors) {
            String outputExpression = ((Expression)descriptor.getOutputExpressions().get(0)).getExpressionString();
            Assert.assertEquals((long)expected.get(outputExpression).size(), (long)descriptor.getInputExpressions().size());
            for (Expression inputExpression : descriptor.getInputExpressions()) {
                Assert.assertTrue((boolean)expected.get(outputExpression).contains(inputExpression.getExpressionString()));
            }
        }
    }

    private void verifySlidingWindowDescriptor(List<String> expected, SlidingWindowAggregationNode node) {
        List descriptorList = node.getAggregationDescriptorList();
        Assert.assertEquals((long)expected.size(), (long)descriptorList.size());
        HashMap verification = new HashMap();
        descriptorList.forEach(d -> verification.put(((Expression)d.getInputExpressions().get(0)).getExpressionString(), 1));
        Assert.assertEquals((long)expected.size(), (long)verification.size());
        expected.forEach(v -> Assert.assertEquals((long)1L, (long)((Integer)verification.get(v)).intValue()));
    }

    private SlidingWindowAggregationNode genSlidingWindowAggregationNode(QueryId queryId, List<PartialPath> paths, TAggregationType type, AggregationStep step, GroupByTimeParameter groupByTimeParameter) {
        return new SlidingWindowAggregationNode(queryId.genPlanNodeId(), paths.stream().map(path -> new AggregationDescriptor(type.name().toLowerCase(), step, Collections.singletonList(new TimeSeriesOperand(path)))).collect(Collectors.toList()), groupByTimeParameter, Ordering.ASC, false);
    }

    private SeriesAggregationSourceNode genAggregationSourceNode(QueryId queryId, String path, TAggregationType type) throws IllegalPathException {
        ArrayList<AggregationDescriptor> descriptors = new ArrayList<AggregationDescriptor>();
        descriptors.add(new AggregationDescriptor(type.name().toLowerCase(), AggregationStep.FINAL, Collections.singletonList(new TimeSeriesOperand(new PartialPath(path)))));
        return new SeriesAggregationScanNode(queryId.genPlanNodeId(), new MeasurementPath(path, TSDataType.INT32), descriptors);
    }

    @Test
    public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
        QueryId queryId = new QueryId("test_query_aligned");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select d666666.s1, d666666.s2, d333.s1 from root.sg limit 10";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode root = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)3L, (long)plan.getInstances().size());
    }

    @Test
    public void testEachSeriesOneRegion() {
        QueryId queryId = new QueryId("test_each_series_1_region");
        MPPQueryContext context = new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
        String sql = "select count(s1), count(s2) from root.sg.d22, root.sg.d55555";
        Analysis analysis = Util.analyze(sql, context);
        PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
        DistributedQueryPlan plan = planner.planFragments();
        Assert.assertEquals((long)2L, (long)plan.getInstances().size());
        List fragmentInstances = plan.getInstances();
        fragmentInstances.forEach(fragmentInstance -> Assert.assertTrue((boolean)(fragmentInstance.getFragment().getPlanNodeTree().getChildren().get(0) instanceof HorizontallyConcatNode)));
        HashMap<String, AggregationStep> expectedStep = new HashMap<String, AggregationStep>();
        expectedStep.put("root.sg.d22.s1", AggregationStep.SINGLE);
        expectedStep.put("root.sg.d22.s2", AggregationStep.SINGLE);
        expectedStep.put("root.sg.d55555.s1", AggregationStep.SINGLE);
        expectedStep.put("root.sg.d55555.s2", AggregationStep.SINGLE);
        fragmentInstances.forEach(f -> this.verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
    }
}

