package org.apache.flink.test.misc;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.class */
public class SuccessAfterNetworkBuffersFailureITCase {
    @Test
    public void testSuccessfulProgramAfterFailure() {
        ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        try {
            try {
                Configuration configuration = new Configuration();
                configuration.setInteger("local.number-taskmanager", 2);
                configuration.setInteger("taskmanager.memory.size", 80);
                configuration.setInteger("taskmanager.numberOfTaskSlots", 8);
                configuration.setInteger("taskmanager.network.numberOfBuffers", 840);
                forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration, false);
                forkableFlinkMiniCluster.start();
                try {
                    runConnectedComponents(forkableFlinkMiniCluster.getLeaderRPCPort());
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail("Program Execution should have succeeded.");
                }
                try {
                    runKMeans(forkableFlinkMiniCluster.getLeaderRPCPort());
                    Assert.fail("This program execution should have failed.");
                } catch (ProgramInvocationException e2) {
                    Assert.assertTrue(e2.getCause().getCause().getMessage().contains("Insufficient number of network buffers"));
                }
                try {
                    runConnectedComponents(forkableFlinkMiniCluster.getLeaderRPCPort());
                } catch (Exception e3) {
                    e3.printStackTrace();
                    Assert.fail("Program Execution should have succeeded.");
                }
                if (forkableFlinkMiniCluster != null) {
                    forkableFlinkMiniCluster.shutdown();
                }
            } catch (Exception e4) {
                e4.printStackTrace();
                Assert.fail(e4.getMessage());
                if (forkableFlinkMiniCluster != null) {
                    forkableFlinkMiniCluster.shutdown();
                }
            }
        } catch (Throwable th) {
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            throw th;
        }
    }

    private static void runConnectedComponents(int i) throws Exception {
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", i, new String[0]);
        createRemoteEnvironment.setParallelism(16);
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        PartitionOperator rebalance = ConnectedComponentsData.getDefaultVertexDataSet(createRemoteEnvironment).rebalance();
        FlatMapOperator flatMap = ConnectedComponentsData.getDefaultEdgeDataSet(createRemoteEnvironment).rebalance().flatMap(new ConnectedComponents.UndirectEdge());
        MapOperator map = rebalance.map(new ConnectedComponents.DuplicateValue());
        DeltaIteration iterateDelta = map.iterateDelta(map, 100, new int[]{0});
        JoinOperator.EquiJoin with = iterateDelta.getWorkset().join(flatMap).where(new int[]{0}).equalTo(new int[]{0}).with(new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new ConnectedComponents.ComponentIdFilter());
        iterateDelta.closeWith(with, with).output(new DiscardingOutputFormat());
        createRemoteEnvironment.execute();
    }

    private static void runKMeans(int i) throws Exception {
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", i, new String[0]);
        createRemoteEnvironment.setParallelism(16);
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        PartitionOperator rebalance = KMeansData.getDefaultPointDataSet(createRemoteEnvironment).rebalance();
        IterativeDataSet iterate = KMeansData.getDefaultCentroidDataSet(createRemoteEnvironment).rebalance().iterate(20);
        rebalance.map(new KMeans.SelectNearestCenter()).withBroadcastSet(iterate.closeWith(rebalance.map(new KMeans.SelectNearestCenter()).withBroadcastSet(iterate, "centroids").map(new KMeans.CountAppender()).groupBy(new int[]{0}).reduce(new KMeans.CentroidAccumulator()).map(new KMeans.CentroidAverager())), "centroids").output(new DiscardingOutputFormat());
        createRemoteEnvironment.execute("KMeans Example");
    }
}
