package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.class */
public abstract class StreamFaultToleranceTestBase extends TestLogger {

    @Parameterized.Parameter
    public FailoverStrategy failoverStrategy;
    protected static final int NUM_TASK_MANAGERS = 3;
    protected static final int NUM_TASK_SLOTS = 4;
    protected static final int PARALLELISM = 12;
    private static MiniClusterWithClientResource cluster;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase$FailoverStrategy.class */
    public enum FailoverStrategy {
        RestartAllFailoverStrategy,
        RestartPipelinedRegionFailoverStrategy
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase$PrefixCount.class */
    public static class PrefixCount implements Serializable {
        public String prefix;
        public String value;
        public long count;

        public PrefixCount() {
        }

        public PrefixCount(String str, String str2, long j) {
            this.prefix = str;
            this.value = str2;
            this.count = j;
        }

        public String toString() {
            return this.prefix + " / " + this.value;
        }
    }

    @Parameterized.Parameters(name = "FailoverStrategy: {0}")
    public static Collection<FailoverStrategy> parameters() {
        return Arrays.asList(FailoverStrategy.RestartAllFailoverStrategy, FailoverStrategy.RestartPipelinedRegionFailoverStrategy);
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        switch (this.failoverStrategy) {
            case RestartPipelinedRegionFailoverStrategy:
                configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
                break;
            case RestartAllFailoverStrategy:
                configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
                break;
        }
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(NUM_TASK_MANAGERS).setNumberSlotsPerTaskManager(NUM_TASK_SLOTS).build());
        cluster.before();
    }

    @After
    public void shutDownExistingCluster() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    public abstract void testProgram(StreamExecutionEnvironment streamExecutionEnvironment);

    public abstract void postSubmit() throws Exception;

    @Test
    public void runCheckpointedProgram() throws Exception {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(500L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
            testProgram(executionEnvironment);
            try {
                TestUtils.submitJobAndWaitForResult(cluster.getClusterClient(), executionEnvironment.getStreamGraph().getJobGraph(), getClass().getClassLoader());
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
            }
            postSubmit();
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
