/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.java;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class WorksetIterationsJavaApiCompilerTest
extends CompilerTestBase {
    private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
    private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
    private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
    private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";

    @Test
    public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
        try {
            Plan plan = this.getJavaTestPlan(false, true);
            OptimizedPlan oPlan = this.compileNoStats(plan);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = WorksetIterationsJavaApiCompilerTest.getOptimizerPlanNodeResolver(oPlan);
            DualInputPlanNode joinWithInvariantNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_INVARIANT_NAME);
            DualInputPlanNode joinWithSolutionSetNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_SOLUTION_SET);
            SingleInputPlanNode worksetReducer = (SingleInputPlanNode)resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
            SingleInputPlanNode deltaMapper = (SingleInputPlanNode)resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithInvariantNode.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithInvariantNode.getInput2().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)joinWithInvariantNode.getKeysForInput1());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)joinWithInvariantNode.getKeysForInput2());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithSolutionSetNode.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput2().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 0}), (Object)joinWithSolutionSetNode.getKeysForInput1());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)worksetReducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)worksetReducer.getKeys(0));
            ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
            ShipStrategyType ss2 = ((Channel)deltaMapper.getOutgoingChannels().get(0)).getShipStrategy();
            Assert.assertTrue((ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH || ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH ? 1 : 0) != 0);
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)("Test errored: " + e.getMessage()));
        }
    }

    @Test
    public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
        try {
            Plan plan = this.getJavaTestPlan(false, false);
            OptimizedPlan oPlan = this.compileNoStats(plan);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = WorksetIterationsJavaApiCompilerTest.getOptimizerPlanNodeResolver(oPlan);
            DualInputPlanNode joinWithInvariantNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_INVARIANT_NAME);
            DualInputPlanNode joinWithSolutionSetNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_SOLUTION_SET);
            SingleInputPlanNode worksetReducer = (SingleInputPlanNode)resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithInvariantNode.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithInvariantNode.getInput2().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)joinWithInvariantNode.getKeysForInput1());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)joinWithInvariantNode.getKeysForInput2());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithSolutionSetNode.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput2().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 0}), (Object)joinWithSolutionSetNode.getKeysForInput1());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)worksetReducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)worksetReducer.getKeys(0));
            Assert.assertEquals((long)2L, (long)joinWithSolutionSetNode.getOutgoingChannels().size());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)((Channel)joinWithSolutionSetNode.getOutgoingChannels().get(0)).getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)((Channel)joinWithSolutionSetNode.getOutgoingChannels().get(1)).getShipStrategy());
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)("Test errored: " + e.getMessage()));
        }
    }

    @Test
    public void testJavaApiWithDirectSoltionSetUpdate() {
        try {
            Plan plan = this.getJavaTestPlan(true, false);
            OptimizedPlan oPlan = this.compileNoStats(plan);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = WorksetIterationsJavaApiCompilerTest.getOptimizerPlanNodeResolver(oPlan);
            DualInputPlanNode joinWithInvariantNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_INVARIANT_NAME);
            DualInputPlanNode joinWithSolutionSetNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_SOLUTION_SET);
            SingleInputPlanNode worksetReducer = (SingleInputPlanNode)resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithInvariantNode.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithInvariantNode.getInput2().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)joinWithInvariantNode.getKeysForInput1());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)joinWithInvariantNode.getKeysForInput2());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithSolutionSetNode.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput2().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 0}), (Object)joinWithSolutionSetNode.getKeysForInput1());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)worksetReducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{1, 2}), (Object)worksetReducer.getKeys(0));
            Assert.assertEquals((long)1L, (long)joinWithSolutionSetNode.getOutgoingChannels().size());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)joinWithSolutionSetNode.getOutgoingChannels().get(0)).getShipStrategy());
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)("Test errored: " + e.getMessage()));
        }
    }

    @Test
    public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator solutionSetInput = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1L, (Object)2L, (Object)3L)}).name("Solution Set");
            Operator worksetInput = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1L, (Object)2L, (Object)3L)}).name("Workset");
            Operator invariantInput = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1L, (Object)2L, (Object)3L)}).name("Invariant Input");
            DeltaIteration iter = solutionSetInput.iterateDelta((DataSet)worksetInput, 100, new int[]{1, 2});
            JoinOperator.EquiJoin result = iter.getWorkset().join((DataSet)invariantInput).where(new int[]{1, 2}).equalTo(new int[]{1, 2}).with((JoinFunction)new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>(){

                public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                    return first;
                }
            });
            try {
                result.join((DataSet)iter.getSolutionSet()).where(new int[]{1, 0}).equalTo(new int[]{0, 2}).with((JoinFunction)new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>(){

                    public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                        return second;
                    }
                });
                Assert.fail((String)"The join should be rejected with key type mismatches.");
            }
            catch (InvalidProgramException invalidProgramException) {}
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)("Test errored: " + e.getMessage()));
        }
    }

    private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
        String[] stringArray;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator solutionSetInput = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1L, (Object)2L, (Object)3L)}).name("Solution Set");
        Operator worksetInput = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1L, (Object)2L, (Object)3L)}).name("Workset");
        Operator invariantInput = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)1L, (Object)2L, (Object)3L)}).name("Invariant Input");
        DeltaIteration iter = solutionSetInput.iterateDelta((DataSet)worksetInput, 100, new int[]{1, 2});
        JoinOperator joinOperator = (JoinOperator)((JoinOperator)iter.getWorkset().join((DataSet)invariantInput).where(new int[]{1, 2}).equalTo(new int[]{1, 2}).with((JoinFunction)new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>(){

            public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                return first;
            }
        }).name(JOIN_WITH_INVARIANT_NAME)).join((DataSet)iter.getSolutionSet()).where(new int[]{1, 0}).equalTo(new int[]{1, 2}).with((JoinFunction)new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>(){

            public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                return second;
            }
        }).name(JOIN_WITH_SOLUTION_SET);
        if (joinPreservesSolutionSet) {
            String[] stringArray2 = new String[3];
            stringArray2[0] = "0->0";
            stringArray2[1] = "1->1";
            stringArray = stringArray2;
            stringArray2[2] = "2->2";
        } else {
            stringArray = null;
        }
        TwoInputUdfOperator joinedWithSolutionSet = joinOperator.withForwardedFieldsSecond(stringArray);
        SingleInputUdfOperator nextWorkset = ((GroupReduceOperator)joinedWithSolutionSet.groupBy(new int[]{1, 2}).reduceGroup((GroupReduceFunction)new RichGroupReduceFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>(){

            public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {
            }
        }).name(NEXT_WORKSET_REDUCER_NAME)).withForwardedFields(new String[]{"1->1", "2->2", "0->0"});
        TwoInputUdfOperator nextSolutionSet = mapBeforeSolutionDelta ? ((MapOperator)joinedWithSolutionSet.map((MapFunction)new RichMapFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>(){

            public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) {
                return value;
            }
        }).name(SOLUTION_DELTA_MAPPER_NAME)).withForwardedFields(new String[]{"0->0", "1->1", "2->2"}) : joinedWithSolutionSet;
        iter.closeWith((DataSet)nextSolutionSet, (DataSet)nextWorkset).output((OutputFormat)new DiscardingOutputFormat());
        return env.createProgramPlan();
    }
}

