package org.apache.flink.test.checkpointing.utils;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.OptionalFailure;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.class */
public abstract class SavepointMigrationTestBase extends TestBaseUtils {

    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).build());
    protected static final int DEFAULT_PARALLELISM = 4;

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
    private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();

    @BeforeClass
    public static void before() {
        SavepointSerializers.setFailWhenLegacyStateDetected(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getResourceFilename(String str) {
        URL resource = SavepointMigrationTestBase.class.getClassLoader().getResource(str);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    private Configuration getConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
        UUID randomUUID = UUID.randomUUID();
        File absoluteFile = TEMP_FOLDER.newFolder("checkpoints_" + randomUUID).getAbsoluteFile();
        File absoluteFile2 = TEMP_FOLDER.newFolder("savepoints_" + randomUUID).getAbsoluteFile();
        if (!absoluteFile.exists() || !absoluteFile2.exists()) {
            throw new Exception("Test setup failed: failed to create (temporary) directories.");
        }
        LOG.info("Created temporary checkpoint directory: " + absoluteFile + ".");
        LOG.info("Created savepoint directory: " + absoluteFile2 + ".");
        configuration.setString(CheckpointingOptions.STATE_BACKEND, "memory");
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, absoluteFile.toURI().toString());
        configuration.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, absoluteFile2.toURI().toString());
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SafeVarargs
    public final void executeAndSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, String str, Tuple2<String, Integer>... tuple2Arr) throws Exception {
        ClusterClient clusterClient = this.miniClusterResource.getClusterClient();
        clusterClient.setDetached(true);
        JobSubmissionResult submitJob = clusterClient.submitJob(streamExecutionEnvironment.getStreamGraph().getJobGraph(), SavepointMigrationTestBase.class.getClassLoader());
        LOG.info("Submitted job {} and waiting...", submitJob.getJobID());
        boolean z = false;
        while (true) {
            if (!DEADLINE.hasTimeLeft()) {
                break;
            }
            Thread.sleep(100L);
            Map accumulators = clusterClient.getAccumulators(submitJob.getJobID());
            boolean z2 = true;
            int length = tuple2Arr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Tuple2<String, Integer> tuple2 = tuple2Arr[i];
                OptionalFailure optionalFailure = (OptionalFailure) accumulators.get(tuple2.f0);
                if (optionalFailure == null) {
                    z2 = false;
                    break;
                }
                Integer num = (Integer) optionalFailure.get();
                if (num == null) {
                    z2 = false;
                    break;
                } else {
                    if (!num.equals(tuple2.f1)) {
                        z2 = false;
                        break;
                    }
                    i++;
                }
            }
            if (z2) {
                z = true;
                break;
            }
        }
        if (!z) {
            Assert.fail("Did not see the expected accumulator results within time limit.");
        }
        LOG.info("Triggering savepoint.");
        File file = new File(new URI((String) clusterClient.triggerSavepoint(submitJob.getJobID(), (String) null).get(DEADLINE.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).getPath());
        if (file.isDirectory()) {
            FileUtils.moveDirectory(file, new File(str));
        } else {
            FileUtils.moveFile(file, new File(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SafeVarargs
    public final void restoreAndExecute(StreamExecutionEnvironment streamExecutionEnvironment, String str, Tuple2<String, Integer>... tuple2Arr) throws Exception {
        ClusterClient clusterClient = this.miniClusterResource.getClusterClient();
        clusterClient.setDetached(true);
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        JobSubmissionResult submitJob = clusterClient.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
        boolean z = false;
        while (true) {
            if (!DEADLINE.hasTimeLeft()) {
                break;
            }
            JobID jobID = submitJob.getJobID();
            try {
                org.junit.Assert.assertNotEquals(JobStatus.FAILED, (JobStatus) clusterClient.getJobStatus(submitJob.getJobID()).get(5L, TimeUnit.SECONDS));
            } catch (Exception e) {
                Assert.fail("Could not connect to job: " + e);
            }
            Thread.sleep(100L);
            Map accumulators = clusterClient.getAccumulators(jobID);
            boolean z2 = true;
            int length = tuple2Arr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Tuple2<String, Integer> tuple2 = tuple2Arr[i];
                OptionalFailure optionalFailure = (OptionalFailure) accumulators.get(tuple2.f0);
                if (optionalFailure == null) {
                    z2 = false;
                    break;
                } else {
                    if (!optionalFailure.get().equals(tuple2.f1)) {
                        z2 = false;
                        break;
                    }
                    i++;
                }
            }
            if (z2) {
                z = true;
                break;
            }
        }
        if (z) {
            return;
        }
        Assert.fail("Did not see the expected accumulator results within time limit.");
    }
}
