/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.misc;

import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;

public class SuccessAfterNetworkBuffersFailureITCase {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSuccessfulProgramAfterFailure() {
        ForkableFlinkMiniCluster cluster = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 2);
            config.setInteger("taskmanager.memory.size", 80);
            config.setInteger("taskmanager.numberOfTaskSlots", 8);
            config.setInteger("taskmanager.network.numberOfBuffers", 840);
            cluster = new ForkableFlinkMiniCluster(config, false);
            cluster.start();
            try {
                SuccessAfterNetworkBuffersFailureITCase.runConnectedComponents(cluster.getLeaderRPCPort());
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)"Program Execution should have succeeded.");
            }
            try {
                SuccessAfterNetworkBuffersFailureITCase.runKMeans(cluster.getLeaderRPCPort());
                Assert.fail((String)"This program execution should have failed.");
            }
            catch (ProgramInvocationException e) {
                Assert.assertTrue((boolean)e.getCause().getCause().getMessage().contains("Insufficient number of network buffers"));
            }
            try {
                SuccessAfterNetworkBuffersFailureITCase.runConnectedComponents(cluster.getLeaderRPCPort());
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)"Program Execution should have succeeded.");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private static void runConnectedComponents(int jmPort) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)jmPort, (String[])new String[0]);
        env.setParallelism(16);
        env.getConfig().disableSysoutLogging();
        PartitionOperator vertices = ConnectedComponentsData.getDefaultVertexDataSet((ExecutionEnvironment)env).rebalance();
        FlatMapOperator edges = ConnectedComponentsData.getDefaultEdgeDataSet((ExecutionEnvironment)env).rebalance().flatMap((FlatMapFunction)new ConnectedComponents.UndirectEdge());
        MapOperator verticesWithInitialId = vertices.map((MapFunction)new ConnectedComponents.DuplicateValue());
        DeltaIteration iteration = verticesWithInitialId.iterateDelta((DataSet)verticesWithInitialId, 100, new int[]{0});
        JoinOperator.EquiJoin changes = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new ConnectedComponents.ComponentIdFilter());
        DataSet result = iteration.closeWith((DataSet)changes, (DataSet)changes);
        result.output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    private static void runKMeans(int jmPort) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)jmPort, (String[])new String[0]);
        env.setParallelism(16);
        env.getConfig().disableSysoutLogging();
        PartitionOperator points = KMeansData.getDefaultPointDataSet((ExecutionEnvironment)env).rebalance();
        PartitionOperator centroids = KMeansData.getDefaultCentroidDataSet((ExecutionEnvironment)env).rebalance();
        IterativeDataSet loop = centroids.iterate(20);
        MapOperator newCentroids = ((MapOperator)points.map((MapFunction)new KMeans.SelectNearestCenter()).withBroadcastSet((DataSet)loop, "centroids")).map((MapFunction)new KMeans.CountAppender()).groupBy(new int[]{0}).reduce((ReduceFunction)new KMeans.CentroidAccumulator()).map((MapFunction)new KMeans.CentroidAverager());
        DataSet finalCentroids = loop.closeWith((DataSet)newCentroids);
        SingleInputUdfOperator clusteredPoints = points.map((MapFunction)new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        clusteredPoints.output((OutputFormat)new DiscardingOutputFormat());
        env.execute("KMeans Example");
    }
}

