/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.manual;

import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Assert;

public class NotSoMiniClusterIterations {
    private static final int PARALLELISM = 100;

    public static void main(String[] args) {
        if (Runtime.getRuntime().maxMemory() >>> 20 < 5000L) {
            throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
        }
        LocalFlinkMiniCluster cluster = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("localinstancemanager.numtaskmanager", 100);
            config.setInteger("taskmanager.memory.size", 8);
            config.setInteger("taskmanager.numberOfTaskSlots", 1);
            config.setInteger("taskmanager.network.numberOfBuffers", 1000);
            config.setInteger("taskmanager.memory.segment-size", 8192);
            config.setInteger("taskmanager.net.server.numThreads", 1);
            config.setInteger("taskmanager.net.client.numThreads", 1);
            cluster = new LocalFlinkMiniCluster(config, false);
            NotSoMiniClusterIterations.runConnectedComponents(cluster.getJobManagerRPCPort());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private static void runConnectedComponents(int jmPort) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)jmPort, (String[])new String[0]);
        env.setParallelism(100);
        env.getConfig().disableSysoutLogging();
        PartitionOperator vertices = ConnectedComponentsData.getDefaultVertexDataSet((ExecutionEnvironment)env).rebalance();
        FlatMapOperator edges = ConnectedComponentsData.getDefaultEdgeDataSet((ExecutionEnvironment)env).rebalance().flatMap((FlatMapFunction)new ConnectedComponents.UndirectEdge());
        MapOperator verticesWithInitialId = vertices.map((MapFunction)new ConnectedComponents.DuplicateValue());
        DeltaIteration iteration = verticesWithInitialId.iterateDelta((DataSet)verticesWithInitialId, 100, new int[]{0});
        JoinOperator.EquiJoin changes = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new ConnectedComponents.ComponentIdFilter());
        DataSet result = iteration.closeWith((DataSet)changes, (DataSet)changes);
        result.output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }
}

