package org.apache.flink.test.recovery;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.class */
public abstract class SimpleRecoveryITCaseBase {
    protected static LocalFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/recovery/SimpleRecoveryITCaseBase$FailingMapper1.class */
    private static class FailingMapper1<T> extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 1;

        private FailingMapper1() {
        }

        public T map(T t) throws Exception {
            if (failuresBeforeSuccess <= 0 || getRuntimeContext().getIndexOfThisSubtask() != 1) {
                return t;
            }
            failuresBeforeSuccess--;
            throw new Exception("Test Failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/SimpleRecoveryITCaseBase$FailingMapper2.class */
    private static class FailingMapper2<T> extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 1;

        private FailingMapper2() {
        }

        public T map(T t) throws Exception {
            if (failuresBeforeSuccess <= 0 || getRuntimeContext().getIndexOfThisSubtask() != 1) {
                return t;
            }
            failuresBeforeSuccess--;
            throw new Exception("Test Failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/SimpleRecoveryITCaseBase$FailingMapper3.class */
    private static class FailingMapper3<T> extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 3;

        private FailingMapper3() {
        }

        public T map(T t) throws Exception {
            if (failuresBeforeSuccess <= 0 || getRuntimeContext().getIndexOfThisSubtask() != 1) {
                return t;
            }
            failuresBeforeSuccess--;
            throw new Exception("Test Failure");
        }
    }

    @AfterClass
    public static void teardownCluster() {
        try {
            cluster.stop();
        } catch (Throwable th) {
            System.err.println("Error stopping cluster on shutdown");
            th.printStackTrace();
            Assert.fail("ClusterClient shutdown caused an exception: " + th.getMessage());
        }
    }

    @Test
    public void testFailedRunThenSuccessfulRun() {
        try {
            ArrayList arrayList = new ArrayList();
            ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(4);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.generateSequence(1L, 10L).rebalance().map(new FailingMapper1()).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.1
                public Long reduce(Long l, Long l2) {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }
            }).output(new LocalCollectionOutputFormat(arrayList));
            try {
                JobExecutionResult execute = createRemoteEnvironment.execute();
                Assert.fail("The program should have failed, but returned " + (execute == null ? "null result" : "result in " + execute.getNetRuntime() + " ms"));
            } catch (ProgramInvocationException e) {
            }
            ExecutionEnvironment createRemoteEnvironment2 = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment2.setParallelism(4);
            createRemoteEnvironment2.setRestartStrategy(RestartStrategies.noRestart());
            createRemoteEnvironment2.getConfig().disableSysoutLogging();
            createRemoteEnvironment2.generateSequence(1L, 10L).rebalance().map(new FailingMapper1()).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.2
                public Long reduce(Long l, Long l2) {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }
            }).output(new LocalCollectionOutputFormat(arrayList));
            executeAndRunAssertions(createRemoteEnvironment2);
            long j = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                j += ((Long) it.next()).longValue();
            }
            Assert.assertEquals(55L, j);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    private void executeAndRunAssertions(ExecutionEnvironment executionEnvironment) throws Exception {
        try {
            JobExecutionResult execute = executionEnvironment.execute();
            Assert.assertTrue(execute.getNetRuntime() >= 0);
            Assert.assertNotNull(execute.getAllAccumulatorResults());
            Assert.assertTrue(execute.getAllAccumulatorResults().isEmpty());
        } catch (JobExecutionException e) {
            Assert.fail("The program should have succeeded on the second run");
        }
    }

    @Test
    public void testRestart() {
        try {
            ArrayList arrayList = new ArrayList();
            ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(4);
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.generateSequence(1L, 10L).rebalance().map(new FailingMapper2()).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.3
                public Long reduce(Long l, Long l2) {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }
            }).output(new LocalCollectionOutputFormat(arrayList));
            executeAndRunAssertions(createRemoteEnvironment);
            long j = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                j += ((Long) it.next()).longValue();
            }
            Assert.assertEquals(55L, j);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRestartMultipleTimes() {
        try {
            ArrayList arrayList = new ArrayList();
            ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(4);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 100L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.generateSequence(1L, 10L).rebalance().map(new FailingMapper3()).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.SimpleRecoveryITCaseBase.4
                public Long reduce(Long l, Long l2) {
                    return Long.valueOf(l.longValue() + l2.longValue());
                }
            }).output(new LocalCollectionOutputFormat(arrayList));
            executeAndRunAssertions(createRemoteEnvironment);
            long j = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                j += ((Long) it.next()).longValue();
            }
            Assert.assertEquals(55L, j);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
