package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.class */
public abstract class StreamFaultToleranceTestBase extends TestLogger {
    protected static final int NUM_TASK_MANAGERS = 2;
    protected static final int NUM_TASK_SLOTS = 3;
    protected static final int PARALLELISM = 6;
    private static ForkableFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase$PrefixCount.class */
    public static class PrefixCount implements Serializable {
        public String prefix;
        public String value;
        public long count;

        public PrefixCount() {
        }

        public PrefixCount(String str, String str2, long j) {
            this.prefix = str;
            this.value = str2;
            this.count = j;
        }

        public String toString() {
            return this.prefix + " / " + this.value;
        }
    }

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", NUM_TASK_MANAGERS);
            configuration.setInteger("taskmanager.numberOfTaskSlots", NUM_TASK_SLOTS);
            configuration.setString("execution-retries.delay", "0 ms");
            configuration.setInteger("taskmanager.memory.size", 12);
            cluster = new ForkableFlinkMiniCluster(configuration, false);
            cluster.start();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Failed to start test cluster: " + e.getMessage());
        }
    }

    @AfterClass
    public static void stopCluster() {
        try {
            cluster.stop();
            cluster = null;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Failed to stop test cluster: " + e.getMessage());
        }
    }

    public abstract void testProgram(StreamExecutionEnvironment streamExecutionEnvironment);

    public abstract void postSubmit() throws Exception;

    @Test
    public void runCheckpointedProgram() {
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.enableCheckpointing(500L);
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            testProgram(createRemoteEnvironment);
            createRemoteEnvironment.execute();
            postSubmit();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
