package org.apache.flink.test.recovery;

import java.nio.file.Path;
import java.time.Duration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.class */
class UpdateJobResourceRequirementsRecoveryITCase {
    private static final Logger LOG = LoggerFactory.getLogger(UpdateJobResourceRequirementsRecoveryITCase.class);

    @RegisterExtension
    private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION = new AllCallbackWrapper<>(new ZooKeeperExtension());

    UpdateJobResourceRequirementsRecoveryITCase() {
    }

    @Test
    void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path path) throws Exception {
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("operator");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(new JobVertex[]{jobVertex});
        JobID jobID = streamingJobGraph.getJobID();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100L));
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
        configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, path.toFile().getAbsolutePath());
        MiniClusterConfiguration build = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(2).build();
        RestClusterClient restClusterClient = new RestClusterClient(configuration, "foobar");
        MiniCluster miniCluster = new MiniCluster(build);
        miniCluster.start();
        FlinkAssertions.assertThatFuture(restClusterClient.submitJob(streamingJobGraph)).eventuallySucceeds();
        ClientUtils.waitUntilJobInitializationFinished(() -> {
            return (JobStatus) restClusterClient.getJobStatus(jobID).get();
        }, () -> {
            return (JobResult) restClusterClient.requestJobResult(jobID).get();
        }, getClass().getClassLoader());
        FlinkAssertions.assertThatFuture(restClusterClient.updateJobResourceRequirements(streamingJobGraph.getJobID(), JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 2).build())).eventuallySucceeds();
        FlinkAssertions.assertThatFuture(miniCluster.closeAsyncWithoutCleaningHighAvailabilityData()).eventuallySucceeds();
        LOG.info("Start second mini cluster to recover the persisted job.");
        MiniCluster miniCluster2 = new MiniCluster(build);
        Throwable th = null;
        try {
            try {
                miniCluster2.start();
                UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, jobID, 2);
                if (miniCluster2 != null) {
                    if (0 == 0) {
                        miniCluster2.close();
                        return;
                    }
                    try {
                        miniCluster2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster2 != null) {
                if (th != null) {
                    try {
                        miniCluster2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster2.close();
                }
            }
            throw th4;
        }
    }
}
