/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

public class WorksetIterationsRecordApiCompilerTest
extends CompilerTestBase {
    private static final long serialVersionUID = 1L;
    private static final String ITERATION_NAME = "Test Workset Iteration";
    private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
    private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
    private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
    private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";
    private final FieldList list0 = new FieldList(0);

    @Test
    public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
        OptimizedPlan oPlan;
        Plan plan = this.getTestPlan(false, true);
        try {
            oPlan = this.compileNoStats(plan);
        }
        catch (CompilerException ce) {
            ce.printStackTrace();
            Assert.fail((String)"The pact compiler is unable to compile this plan correctly.");
            return;
        }
        CompilerTestBase.OptimizerPlanNodeResolver resolver = WorksetIterationsRecordApiCompilerTest.getOptimizerPlanNodeResolver(oPlan);
        DualInputPlanNode joinWithInvariantNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_INVARIANT_NAME);
        DualInputPlanNode joinWithSolutionSetNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_SOLUTION_SET);
        SingleInputPlanNode worksetReducer = (SingleInputPlanNode)resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
        SingleInputPlanNode deltaMapper = (SingleInputPlanNode)resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithInvariantNode.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithInvariantNode.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.list0, (Object)joinWithInvariantNode.getKeysForInput1());
        Assert.assertEquals((Object)this.list0, (Object)joinWithInvariantNode.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput2().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)worksetReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.list0, (Object)worksetReducer.getKeys(0));
        ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
        ShipStrategyType ss2 = ((Channel)deltaMapper.getOutgoingChannels().get(0)).getShipStrategy();
        Assert.assertTrue((ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH || ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH ? 1 : 0) != 0);
        new JobGraphGenerator().compileJobGraph(oPlan);
    }

    @Test
    public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
        OptimizedPlan oPlan;
        Plan plan = this.getTestPlan(false, false);
        try {
            oPlan = this.compileNoStats(plan);
        }
        catch (CompilerException ce) {
            ce.printStackTrace();
            Assert.fail((String)"The pact compiler is unable to compile this plan correctly.");
            return;
        }
        CompilerTestBase.OptimizerPlanNodeResolver resolver = WorksetIterationsRecordApiCompilerTest.getOptimizerPlanNodeResolver(oPlan);
        DualInputPlanNode joinWithInvariantNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_INVARIANT_NAME);
        DualInputPlanNode joinWithSolutionSetNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_SOLUTION_SET);
        SingleInputPlanNode worksetReducer = (SingleInputPlanNode)resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithInvariantNode.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithInvariantNode.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.list0, (Object)joinWithInvariantNode.getKeysForInput1());
        Assert.assertEquals((Object)this.list0, (Object)joinWithInvariantNode.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput2().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)worksetReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.list0, (Object)worksetReducer.getKeys(0));
        Assert.assertEquals((long)2L, (long)joinWithSolutionSetNode.getOutgoingChannels().size());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)((Channel)joinWithSolutionSetNode.getOutgoingChannels().get(0)).getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)((Channel)joinWithSolutionSetNode.getOutgoingChannels().get(1)).getShipStrategy());
        new JobGraphGenerator().compileJobGraph(oPlan);
    }

    @Test
    public void testRecordApiWithDirectSoltionSetUpdate() {
        OptimizedPlan oPlan;
        Plan plan = this.getTestPlan(true, false);
        try {
            oPlan = this.compileNoStats(plan);
        }
        catch (CompilerException ce) {
            ce.printStackTrace();
            Assert.fail((String)"The pact compiler is unable to compile this plan correctly.");
            return;
        }
        CompilerTestBase.OptimizerPlanNodeResolver resolver = WorksetIterationsRecordApiCompilerTest.getOptimizerPlanNodeResolver(oPlan);
        DualInputPlanNode joinWithInvariantNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_INVARIANT_NAME);
        DualInputPlanNode joinWithSolutionSetNode = (DualInputPlanNode)resolver.getNode(JOIN_WITH_SOLUTION_SET);
        SingleInputPlanNode worksetReducer = (SingleInputPlanNode)resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithInvariantNode.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)joinWithInvariantNode.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.list0, (Object)joinWithInvariantNode.getKeysForInput1());
        Assert.assertEquals((Object)this.list0, (Object)joinWithInvariantNode.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)joinWithSolutionSetNode.getInput2().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)worksetReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.list0, (Object)worksetReducer.getKeys(0));
        Assert.assertEquals((long)1L, (long)joinWithSolutionSetNode.getOutgoingChannels().size());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)joinWithSolutionSetNode.getOutgoingChannels().get(0)).getShipStrategy());
        new JobGraphGenerator().compileJobGraph(oPlan);
    }

    private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
        Operator workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
        Operator invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
        DeltaIteration deltaIt = solSetInput.iterateDelta((DataSet)workSetInput, 100, new int[]{0}).name(ITERATION_NAME);
        Operator join1 = ((JoinOperator)deltaIt.getWorkset().join((DataSet)invariantInput).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).withForwardedFieldsFirst(new String[]{"*"})).name(JOIN_WITH_INVARIANT_NAME);
        Operator join2 = deltaIt.getSolutionSet().join((DataSet)join1).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name(JOIN_WITH_SOLUTION_SET);
        if (joinPreservesSolutionSet) {
            ((JoinOperator)join2).withForwardedFieldsFirst(new String[]{"*"});
        }
        Operator nextWorkset = ((GroupReduceOperator)join2.groupBy(new int[]{0}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).name(NEXT_WORKSET_REDUCER_NAME);
        if (mapBeforeSolutionDelta) {
            Operator mapper = ((MapOperator)join2.map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).name(SOLUTION_DELTA_MAPPER_NAME);
            deltaIt.closeWith((DataSet)mapper, (DataSet)nextWorkset).output((OutputFormat)new DiscardingOutputFormat());
        } else {
            deltaIt.closeWith((DataSet)join2, (DataSet)nextWorkset).output((OutputFormat)new DiscardingOutputFormat());
        }
        return env.createProgramPlan();
    }
}

