/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.programs;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class ConnectedComponentsTest
extends CompilerTestBase {
    private static final String VERTEX_SOURCE = "Vertices";
    private static final String ITERATION_NAME = "Connected Components Iteration";
    private static final String EDGES_SOURCE = "Edges";
    private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
    private static final String MIN_ID_REDUCER = "Find Minimum Candidate Id";
    private static final String UPDATE_ID_MATCH = "Update Component Id";
    private static final String SINK = "Result";
    private final FieldList set0 = new FieldList(0);

    @Test
    public void testWorksetConnectedComponents() {
        Plan plan = ConnectedComponentsTest.getConnectedComponentsPlan(8, 100, false);
        OptimizedPlan optPlan = this.compileNoStats(plan);
        CompilerTestBase.OptimizerPlanNodeResolver or = ConnectedComponentsTest.getOptimizerPlanNodeResolver(optPlan);
        SourcePlanNode vertexSource = (SourcePlanNode)or.getNode(VERTEX_SOURCE);
        SourcePlanNode edgesSource = (SourcePlanNode)or.getNode(EDGES_SOURCE);
        SinkPlanNode sink = (SinkPlanNode)or.getNode(SINK);
        WorksetIterationPlanNode iter = (WorksetIterationPlanNode)or.getNode(ITERATION_NAME);
        DualInputPlanNode neighborsJoin = (DualInputPlanNode)or.getNode(JOIN_NEIGHBORS_MATCH);
        SingleInputPlanNode minIdReducer = (SingleInputPlanNode)or.getNode(MIN_ID_REDUCER);
        SingleInputPlanNode minIdCombiner = (SingleInputPlanNode)minIdReducer.getPredecessor();
        DualInputPlanNode updatingMatch = (DualInputPlanNode)or.getNode(UPDATE_ID_MATCH);
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)sink.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)vertexSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)edgesSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, (Object)neighborsJoin.getDriverStrategy());
        Assert.assertTrue((!neighborsJoin.getInput1().getTempMode().isCached() ? 1 : 0) != 0);
        Assert.assertTrue((!neighborsJoin.getInput2().getTempMode().isCached() ? 1 : 0) != 0);
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput2());
        Assert.assertEquals((Object)DriverStrategy.HYBRIDHASH_BUILD_SECOND, (Object)updatingMatch.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)neighborsJoin.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)neighborsJoin.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getInput2().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)minIdReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)minIdCombiner.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput2().getShipStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)sink.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialWorksetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.COMBININGSORT, (Object)minIdReducer.getInput().getLocalStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getLocalStrategyKeys());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)minIdCombiner.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)TempMode.NONE, (Object)iter.getInitialWorksetInput().getTempMode());
        Assert.assertEquals((Object)TempMode.NONE, (Object)iter.getInitialSolutionSetInput().getTempMode());
        Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)iter.getInitialWorksetInput().getDataExchangeMode());
        Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)iter.getInitialSolutionSetInput().getDataExchangeMode());
        JobGraphGenerator jgg = new JobGraphGenerator();
        jgg.compileJobGraph(optPlan);
    }

    @Test
    public void testWorksetConnectedComponentsWithSolutionSetAsFirstInput() {
        Plan plan = ConnectedComponentsTest.getConnectedComponentsPlan(8, 100, true);
        OptimizedPlan optPlan = this.compileNoStats(plan);
        CompilerTestBase.OptimizerPlanNodeResolver or = ConnectedComponentsTest.getOptimizerPlanNodeResolver(optPlan);
        SourcePlanNode vertexSource = (SourcePlanNode)or.getNode(VERTEX_SOURCE);
        SourcePlanNode edgesSource = (SourcePlanNode)or.getNode(EDGES_SOURCE);
        SinkPlanNode sink = (SinkPlanNode)or.getNode(SINK);
        WorksetIterationPlanNode iter = (WorksetIterationPlanNode)or.getNode(ITERATION_NAME);
        DualInputPlanNode neighborsJoin = (DualInputPlanNode)or.getNode(JOIN_NEIGHBORS_MATCH);
        SingleInputPlanNode minIdReducer = (SingleInputPlanNode)or.getNode(MIN_ID_REDUCER);
        SingleInputPlanNode minIdCombiner = (SingleInputPlanNode)minIdReducer.getPredecessor();
        DualInputPlanNode updatingMatch = (DualInputPlanNode)or.getNode(UPDATE_ID_MATCH);
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)sink.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)vertexSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)edgesSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, (Object)neighborsJoin.getDriverStrategy());
        Assert.assertTrue((!neighborsJoin.getInput1().getTempMode().isCached() ? 1 : 0) != 0);
        Assert.assertTrue((!neighborsJoin.getInput2().getTempMode().isCached() ? 1 : 0) != 0);
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput2());
        Assert.assertEquals((Object)DriverStrategy.HYBRIDHASH_BUILD_FIRST, (Object)updatingMatch.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)neighborsJoin.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)neighborsJoin.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getInput2().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)minIdReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)minIdCombiner.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput2().getShipStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)sink.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialWorksetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.COMBININGSORT, (Object)minIdReducer.getInput().getLocalStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getLocalStrategyKeys());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)minIdCombiner.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)TempMode.NONE, (Object)iter.getInitialWorksetInput().getTempMode());
        Assert.assertEquals((Object)TempMode.NONE, (Object)iter.getInitialSolutionSetInput().getTempMode());
        Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)iter.getInitialWorksetInput().getDataExchangeMode());
        Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)iter.getInitialSolutionSetInput().getDataExchangeMode());
        JobGraphGenerator jgg = new JobGraphGenerator();
        jgg.compileJobGraph(optPlan);
    }

    private static Plan getConnectedComponentsPlan(int parallelism, int iterations, boolean solutionSetFirst) {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        Operator verticesWithId = ((DataSource)env.generateSequence(0L, 1000L).name(VERTEX_SOURCE)).map((MapFunction)new MapFunction<Long, Tuple2<Long, Long>>(){

            public Tuple2<Long, Long> map(Long value) {
                return new Tuple2((Object)value, (Object)value);
            }
        }).name("Assign Vertex Ids");
        DeltaIteration iteration = verticesWithId.iterateDelta((DataSet)verticesWithId, iterations, new int[]{0}).name(ITERATION_NAME);
        Operator edges = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)}).name(EDGES_SOURCE);
        Operator minCandidateId = ((JoinOperator)iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).projectSecond(new int[]{1}).projectFirst(new int[]{1}).name(JOIN_NEIGHBORS_MATCH)).groupBy(new int[]{0}).min(1).name(MIN_ID_REDUCER);
        Operator updateComponentId = solutionSetFirst ? ((JoinOperator)((JoinOperator)iteration.getSolutionSet().join((DataSet)minCandidateId).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>(){

            public void join(Tuple2<Long, Long> current, Tuple2<Long, Long> candidate, Collector<Tuple2<Long, Long>> out) {
                if ((Long)candidate.f1 < (Long)current.f1) {
                    out.collect(candidate);
                }
            }
        }).withForwardedFieldsFirst(new String[]{"0"})).withForwardedFieldsSecond(new String[]{"0"})).name(UPDATE_ID_MATCH) : ((JoinOperator)((JoinOperator)minCandidateId.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>(){

            public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> current, Collector<Tuple2<Long, Long>> out) {
                if ((Long)candidate.f1 < (Long)current.f1) {
                    out.collect(candidate);
                }
            }
        }).withForwardedFieldsFirst(new String[]{"0"})).withForwardedFieldsSecond(new String[]{"0"})).name(UPDATE_ID_MATCH);
        iteration.closeWith((DataSet)updateComponentId, (DataSet)updateComponentId).output((OutputFormat)new DiscardingOutputFormat()).name(SINK);
        return env.createProgramPlan();
    }
}

