package org.apache.flink.streaming.util;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/util/StreamingProgramTestBase.class */
public abstract class StreamingProgramTestBase extends AbstractTestBase {
    private static final int DEFAULT_PARALLELISM = 4;
    private JobExecutionResult latestExecutionResult;
    private int parallelism;

    public StreamingProgramTestBase() {
        this(new Configuration());
    }

    public StreamingProgramTestBase(Configuration configuration) {
        super(configuration);
        this.parallelism = DEFAULT_PARALLELISM;
        setTaskManagerNumSlots(this.parallelism);
    }

    public void setParallelism(int i) {
        this.parallelism = i;
        setTaskManagerNumSlots(i);
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public JobExecutionResult getLatestExecutionResult() {
        return this.latestExecutionResult;
    }

    protected abstract void testProgram() throws Exception;

    protected void preSubmit() throws Exception {
    }

    protected void postSubmit() throws Exception {
    }

    @Test
    public void testJobWithoutObjectReuse() throws Exception {
        startCluster();
        try {
            try {
                preSubmit();
            } catch (Exception e) {
                System.err.println(e.getMessage());
                e.printStackTrace();
                Assert.fail("Pre-submit work caused an error: " + e.getMessage());
            }
            TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(this.executor, this.parallelism);
            testStreamEnvironment.setAsContext();
            try {
                testProgram();
                this.latestExecutionResult = testStreamEnvironment.latestResult;
            } catch (Exception e2) {
                System.err.println(e2.getMessage());
                e2.printStackTrace();
                Assert.fail("Error while calling the test program: " + e2.getMessage());
            }
            Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
            try {
                postSubmit();
            } catch (Exception e3) {
                System.err.println(e3.getMessage());
                e3.printStackTrace();
                Assert.fail("Post-submit work caused an error: " + e3.getMessage());
            }
        } finally {
            stopCluster();
        }
    }
}
