/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public abstract class SimpleRecoveryITCaseBase
extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final int DATA_FROM = 1;
    private static final int DATA_TO = 16;
    private static final int EXPECTED_SUM = 136;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(4).setNumberSlotsPerTaskManager(1).build());

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailedRunThenSuccessfulRun() throws Exception {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
            env.setParallelism(4);
            Configuration configuration = new Configuration();
            configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"none");
            env.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
            try {
                CloseableIterator iterator = env.fromSequence(1L, 16L).rebalance().map(new FailingMapper1()).fullWindowPartition().reduce(Long::sum).executeAndCollect();
                CollectionUtil.iteratorToList((Iterator)iterator);
                Assert.fail((String)"The program should have failed, but run successfully");
            }
            catch (RuntimeException iterator) {
                // empty catch block
            }
            env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
            env.setParallelism(4);
            configuration = new Configuration();
            configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"none");
            env.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
            List resultCollection = CollectionUtil.iteratorToList((Iterator)env.fromSequence(1L, 16L).rebalance().map(new FailingMapper1()).fullWindowPartition().reduce(Long::sum).executeAndCollect());
            long sum = 0L;
            Iterator iterator = resultCollection.iterator();
            while (iterator.hasNext()) {
                long l = (Long)iterator.next();
                sum += l;
            }
            Assert.assertEquals((long)136L, (long)sum);
        }
        finally {
            FailingMapper1.failuresBeforeSuccess = 1;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestart() throws Exception {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
            env.setParallelism(4);
            List resultCollection = CollectionUtil.iteratorToList((Iterator)env.fromSequence(1L, 16L).rebalance().map(new FailingMapper2()).fullWindowPartition().reduce(Long::sum).executeAndCollect());
            long sum = 0L;
            Iterator iterator = resultCollection.iterator();
            while (iterator.hasNext()) {
                long l = (Long)iterator.next();
                sum += l;
            }
            Assert.assertEquals((long)136L, (long)sum);
        }
        finally {
            FailingMapper2.failuresBeforeSuccess = 1;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestartMultipleTimes() throws Exception {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
            env.setParallelism(4);
            List resultCollection = CollectionUtil.iteratorToList((Iterator)env.fromSequence(1L, 16L).rebalance().map(new FailingMapper3()).fullWindowPartition().reduce(Long::sum).executeAndCollect());
            long sum = 0L;
            Iterator iterator = resultCollection.iterator();
            while (iterator.hasNext()) {
                long l = (Long)iterator.next();
                sum += l;
            }
            Assert.assertEquals((long)136L, (long)sum);
        }
        finally {
            FailingMapper3.failuresBeforeSuccess = 3;
        }
    }

    private static class FailingMapper3<T>
    extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 3;

        private FailingMapper3() {
        }

        public T map(T value) throws Exception {
            if (failuresBeforeSuccess > 0 && this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 1) {
                --failuresBeforeSuccess;
                throw new Exception("Test Failure");
            }
            return value;
        }
    }

    private static class FailingMapper2<T>
    extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 1;

        private FailingMapper2() {
        }

        public T map(T value) throws Exception {
            if (failuresBeforeSuccess > 0 && this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 1) {
                --failuresBeforeSuccess;
                throw new Exception("Test Failure");
            }
            return value;
        }
    }

    private static class FailingMapper1<T>
    extends RichMapFunction<T, T> {
        private static volatile int failuresBeforeSuccess = 1;

        private FailingMapper1() {
        }

        public T map(T value) throws Exception {
            if (failuresBeforeSuccess > 0 && this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                --failuresBeforeSuccess;
                throw new Exception("Test Failure");
            }
            return value;
        }
    }
}

