/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class SimpleRecoveryITCase {
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void setupCluster() {
        Configuration config = new Configuration();
        config.setInteger("local.number-taskmanager", 2);
        config.setInteger("taskmanager.numberOfTaskSlots", 2);
        config.setString("execution-retries.delay", "100 ms");
        cluster = new ForkableFlinkMiniCluster(config, false);
        cluster.start();
    }

    @AfterClass
    public static void teardownCluster() {
        try {
            cluster.stop();
        }
        catch (Throwable t) {
            System.err.println("Error stopping cluster on shutdown");
            t.printStackTrace();
            Assert.fail((String)("Cluster shutdown caused an exception: " + t.getMessage()));
        }
    }

    @Test
    public void testFailedRunThenSuccessfulRun() {
        try {
            ArrayList resultCollection = new ArrayList();
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setNumberOfExecutionRetries(0);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 10L).rebalance().map(new FailingMapper1()).reduce((ReduceFunction)new ReduceFunction<Long>(){

                public Long reduce(Long value1, Long value2) {
                    return value1 + value2;
                }
            }).output((OutputFormat)new LocalCollectionOutputFormat(resultCollection));
            try {
                JobExecutionResult res = env.execute();
                String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms";
                Assert.fail((String)("The program should have failed, but returned " + msg));
            }
            catch (ProgramInvocationException res) {
                // empty catch block
            }
            env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setNumberOfExecutionRetries(0);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 10L).rebalance().map(new FailingMapper1()).reduce((ReduceFunction)new ReduceFunction<Long>(){

                public Long reduce(Long value1, Long value2) {
                    return value1 + value2;
                }
            }).output((OutputFormat)new LocalCollectionOutputFormat(resultCollection));
            try {
                JobExecutionResult result = env.execute();
                Assert.assertTrue((result.getNetRuntime() >= 0L ? 1 : 0) != 0);
                Assert.assertNotNull((Object)result.getAllAccumulatorResults());
                Assert.assertTrue((boolean)result.getAllAccumulatorResults().isEmpty());
            }
            catch (JobExecutionException e) {
                Assert.fail((String)"The program should have succeeded on the second run");
            }
            long sum = 0L;
            Iterator iterator = resultCollection.iterator();
            while (iterator.hasNext()) {
                long l = (Long)iterator.next();
                sum += l;
            }
            Assert.assertEquals((long)55L, (long)sum);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRestart() {
        try {
            ArrayList resultCollection = new ArrayList();
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setNumberOfExecutionRetries(1);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 10L).rebalance().map(new FailingMapper2()).reduce((ReduceFunction)new ReduceFunction<Long>(){

                public Long reduce(Long value1, Long value2) {
                    return value1 + value2;
                }
            }).output((OutputFormat)new LocalCollectionOutputFormat(resultCollection));
            try {
                JobExecutionResult result = env.execute();
                Assert.assertTrue((result.getNetRuntime() >= 0L ? 1 : 0) != 0);
                Assert.assertNotNull((Object)result.getAllAccumulatorResults());
                Assert.assertTrue((boolean)result.getAllAccumulatorResults().isEmpty());
            }
            catch (JobExecutionException e) {
                Assert.fail((String)"The program should have succeeded on the second run");
            }
            long sum = 0L;
            Iterator iterator = resultCollection.iterator();
            while (iterator.hasNext()) {
                long l = (Long)iterator.next();
                sum += l;
            }
            Assert.assertEquals((long)55L, (long)sum);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRestartMultipleTimes() {
        try {
            ArrayList resultCollection = new ArrayList();
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setNumberOfExecutionRetries(5);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 10L).rebalance().map(new FailingMapper3()).reduce((ReduceFunction)new ReduceFunction<Long>(){

                public Long reduce(Long value1, Long value2) {
                    return value1 + value2;
                }
            }).output((OutputFormat)new LocalCollectionOutputFormat(resultCollection));
            try {
                JobExecutionResult result = env.execute();
                Assert.assertTrue((result.getNetRuntime() >= 0L ? 1 : 0) != 0);
                Assert.assertNotNull((Object)result.getAllAccumulatorResults());
                Assert.assertTrue((boolean)result.getAllAccumulatorResults().isEmpty());
            }
            catch (JobExecutionException e) {
                Assert.fail((String)"The program should have succeeded on the second run");
            }
            long sum = 0L;
            Iterator iterator = resultCollection.iterator();
            while (iterator.hasNext()) {
                long l = (Long)iterator.next();
                sum += l;
            }
            Assert.assertEquals((long)55L, (long)sum);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class FailingMapper3<T>
    extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 3;

        private FailingMapper3() {
        }

        public T map(T value) throws Exception {
            if (failuresBeforeSuccess > 0 && this.getRuntimeContext().getIndexOfThisSubtask() == 1) {
                --failuresBeforeSuccess;
                throw new Exception("Test Failure");
            }
            return value;
        }
    }

    private static class FailingMapper2<T>
    extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 1;

        private FailingMapper2() {
        }

        public T map(T value) throws Exception {
            if (failuresBeforeSuccess > 0 && this.getRuntimeContext().getIndexOfThisSubtask() == 1) {
                --failuresBeforeSuccess;
                throw new Exception("Test Failure");
            }
            return value;
        }
    }

    private static class FailingMapper1<T>
    extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 1;

        private FailingMapper1() {
        }

        public T map(T value) throws Exception {
            if (failuresBeforeSuccess > 0 && this.getRuntimeContext().getIndexOfThisSubtask() == 1) {
                --failuresBeforeSuccess;
                throw new Exception("Test Failure");
            }
            return value;
        }
    }
}

