package org.apache.flink.optimizer.dataexchange;

import java.util.Collection;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.class */
public class DataExchangeModeClosedBranchingTest extends CompilerTestBase {
    @Test
    public void testPipelinedForced() {
        verifyBranchingJoiningPlan(ExecutionMode.PIPELINED_FORCED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testPipelined() {
        verifyBranchingJoiningPlan(ExecutionMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testBatch() {
        verifyBranchingJoiningPlan(ExecutionMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testBatchForced() {
        verifyBranchingJoiningPlan(ExecutionMode.BATCH_FORCED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH);
    }

    private void verifyBranchingJoiningPlan(ExecutionMode executionMode, DataExchangeMode dataExchangeMode, DataExchangeMode dataExchangeMode2, DataExchangeMode dataExchangeMode3, DataExchangeMode dataExchangeMode4, DataExchangeMode dataExchangeMode5, DataExchangeMode dataExchangeMode6, DataExchangeMode dataExchangeMode7, DataExchangeMode dataExchangeMode8, DataExchangeMode dataExchangeMode9, DataExchangeMode dataExchangeMode10, DataExchangeMode dataExchangeMode11, DataExchangeMode dataExchangeMode12, DataExchangeMode dataExchangeMode13, DataExchangeMode dataExchangeMode14) {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.getConfig().setExecutionMode(executionMode);
            MapOperator map = executionEnvironment.fromElements(new Long[]{33L, 44L}).map(new MapFunction<Long, Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.dataexchange.DataExchangeModeClosedBranchingTest.1
                public Tuple2<Long, Long> map(Long l) {
                    return new Tuple2<>(l, l);
                }
            });
            ReduceOperator reduce = map.groupBy(new int[]{0}).reduce(new SelectOneReducer());
            reduce.output(new DiscardingOutputFormat()).name("reduceSink");
            FilterOperator filter = map.filter(new FilterFunction<Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.dataexchange.DataExchangeModeClosedBranchingTest.2
                public boolean filter(Tuple2<Long, Long> tuple2) throws Exception {
                    return false;
                }
            });
            JoinOperator.EquiJoin with = reduce.join(filter).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            with.flatMap(new IdentityFlatMapper()).output(new DiscardingOutputFormat()).name("flatMapSink");
            with.coGroup(filter.groupBy(new int[]{1}).reduceGroup(new Top1GroupReducer())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyCoGroupFunction()).output(new DiscardingOutputFormat()).name("cgSink");
            OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan());
            SinkPlanNode findSink = findSink(compileNoStats.getDataSinks(), "reduceSink");
            SinkPlanNode findSink2 = findSink(compileNoStats.getDataSinks(), "flatMapSink");
            SinkPlanNode findSink3 = findSink(compileNoStats.getDataSinks(), "cgSink");
            DualInputPlanNode predecessor = findSink3.getPredecessor();
            DualInputPlanNode source = predecessor.getInput1().getSource();
            SingleInputPlanNode source2 = predecessor.getInput2().getSource();
            SingleInputPlanNode predecessor2 = source2.getPredecessor();
            SingleInputPlanNode source3 = source.getInput1().getSource();
            SingleInputPlanNode predecessor3 = source3.getPredecessor();
            Assert.assertEquals(source3, findSink.getPredecessor());
            SingleInputPlanNode source4 = source.getInput2().getSource();
            Assert.assertEquals(source4, predecessor2.getPredecessor());
            SingleInputPlanNode predecessor4 = source4.getPredecessor();
            Assert.assertEquals(predecessor4, predecessor3.getPredecessor());
            SingleInputPlanNode predecessor5 = findSink2.getPredecessor();
            Assert.assertEquals(source, predecessor5.getPredecessor());
            Assert.assertEquals(dataExchangeMode5, findSink.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode11, findSink2.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode14, findSink3.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode12, predecessor.getInput1().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode13, predecessor.getInput2().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode6, source.getInput1().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode7, source.getInput2().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode9, source2.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode8, predecessor2.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode10, predecessor5.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode4, source4.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode3, source3.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode2, predecessor3.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode, predecessor4.getInput().getDataExchangeMode());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String str) {
        for (SinkPlanNode sinkPlanNode : collection) {
            String name = sinkPlanNode.getOptimizerNode().getOperator().getName();
            if (name != null && name.equals(str)) {
                return sinkPlanNode;
            }
        }
        throw new IllegalArgumentException("No node with that name was found.");
    }
}
