package org.apache.flink.runtime.highavailability;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.class */
public abstract class AbstractHAJobRunITCase {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    @Order(1)
    @RegisterExtension
    private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION = new AllCallbackWrapper<>(new ZooKeeperExtension());

    protected static Configuration addHaConfiguration(Configuration configuration, String str) {
        configuration.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ((ZooKeeperExtension) ZOOKEEPER_EXTENSION.getCustomExtension()).getConnectString());
        configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, str);
        FileSystem.initialize(configuration, (PluginManager) null);
        return configuration;
    }

    protected void runAfterJobTermination() throws Exception {
    }

    @Test
    public void testJobExecutionInHaMode(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        miniCluster.submitJob(singleNoOpJobGraph).get(30L, TimeUnit.SECONDS);
        Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(30L));
        Assertions.assertThat((JobStatus) FutureUtils.retrySuccessfulWithDelay(() -> {
            return miniCluster.getJobStatus(singleNoOpJobGraph.getJobID());
        }, Duration.ofMillis(10L), fromNow, jobStatus -> {
            return miniCluster.isRunning() && jobStatus == JobStatus.FINISHED;
        }, new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor())).get(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(JobStatus.FINISHED);
        runAfterJobTermination();
    }
}
