/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public abstract class StreamFaultToleranceTestBase
extends TestLogger {
    protected static final int NUM_TASK_MANAGERS = 2;
    protected static final int NUM_TASK_SLOTS = 3;
    protected static final int PARALLELISM = 6;
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 2);
            config.setInteger("taskmanager.numberOfTaskSlots", 3);
            config.setString("execution-retries.delay", "0 ms");
            config.setInteger("taskmanager.memory.size", 12);
            cluster = new ForkableFlinkMiniCluster(config, false);
            cluster.start();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to start test cluster: " + e.getMessage()));
        }
    }

    @AfterClass
    public static void stopCluster() {
        try {
            cluster.stop();
            cluster = null;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to stop test cluster: " + e.getMessage()));
        }
    }

    public abstract void testProgram(StreamExecutionEnvironment var1);

    public abstract void postSubmit() throws Exception;

    @Test
    public void runCheckpointedProgram() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(6);
            env.enableCheckpointing(500L);
            env.getConfig().disableSysoutLogging();
            this.testProgram(env);
            env.execute();
            this.postSubmit();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static class PrefixCount
    implements Serializable {
        public String prefix;
        public String value;
        public long count;

        public PrefixCount() {
        }

        public PrefixCount(String prefix, String value, long count) {
            this.prefix = prefix;
            this.value = value;
            this.count = count;
        }

        public String toString() {
            return this.prefix + " / " + this.value;
        }
    }
}

