package org.apache.flink.test.checkpointing.utils;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.class */
public class SavepointMigrationTestBase extends TestBaseUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
    private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
    protected static final int DEFAULT_PARALLELISM = 4;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    protected LocalFlinkMiniCluster cluster = null;

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getResourceFilename(String str) {
        URL resource = SavepointMigrationTestBase.class.getClassLoader().getResource(str);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setInteger("taskmanager.numberOfTaskSlots", DEFAULT_PARALLELISM);
        File absoluteFile = this.tempFolder.newFolder("checkpoints").getAbsoluteFile();
        File absoluteFile2 = this.tempFolder.newFolder("savepoints").getAbsoluteFile();
        if (!absoluteFile.exists() || !absoluteFile2.exists()) {
            throw new Exception("Test setup failed: failed to create (temporary) directories.");
        }
        LOG.info("Created temporary checkpoint directory: " + absoluteFile + ".");
        LOG.info("Created savepoint directory: " + absoluteFile2 + ".");
        configuration.setString(CoreOptions.STATE_BACKEND, "memory");
        configuration.setString("state.backend.fs.checkpointdir", absoluteFile.toURI().toString());
        configuration.setString("state.backend.fs.memory-threshold", "0");
        configuration.setString("state.savepoints.dir", absoluteFile2.toURI().toString());
        this.cluster = TestBaseUtils.startCluster(configuration, false);
    }

    @After
    public void teardown() throws Exception {
        stopCluster(this.cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SafeVarargs
    public final void executeAndSavepoint(StreamExecutionEnvironment streamExecutionEnvironment, String str, Tuple2<String, Integer>... tuple2Arr) throws Exception {
        ActorGateway actorGateway = (ActorGateway) Await.result(this.cluster.leaderGateway().future(), DEADLINE.timeLeft());
        JobSubmissionResult submitJobDetached = this.cluster.submitJobDetached(streamExecutionEnvironment.getStreamGraph().getJobGraph());
        LOG.info("Submitted job {} and waiting...", submitJobDetached.getJobID());
        StandaloneClusterClient standaloneClusterClient = new StandaloneClusterClient(this.cluster.configuration());
        boolean z = false;
        while (true) {
            if (!DEADLINE.hasTimeLeft()) {
                break;
            }
            Thread.sleep(100L);
            Map accumulators = standaloneClusterClient.getAccumulators(submitJobDetached.getJobID());
            boolean z2 = true;
            int length = tuple2Arr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Tuple2<String, Integer> tuple2 = tuple2Arr[i];
                Integer num = (Integer) accumulators.get(tuple2.f0);
                if (num == null) {
                    z2 = false;
                    break;
                } else {
                    if (!num.equals(tuple2.f1)) {
                        z2 = false;
                        break;
                    }
                    i++;
                }
            }
            if (z2) {
                z = true;
                break;
            }
        }
        if (!z) {
            Assert.fail("Did not see the expected accumulator results within time limit.");
        }
        LOG.info("Triggering savepoint.");
        Object result = Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(submitJobDetached.getJobID(), Option.empty()), DEADLINE.timeLeft()), DEADLINE.timeLeft());
        if (result instanceof JobManagerMessages.TriggerSavepointFailure) {
            Assert.fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) result).cause());
        }
        String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) result).savepointPath();
        LOG.info("Saved savepoint: " + savepointPath);
        FileUtils.moveFile(new File(new URI(savepointPath).getPath()), new File(str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SafeVarargs
    public final void restoreAndExecute(StreamExecutionEnvironment streamExecutionEnvironment, String str, Tuple2<String, Integer>... tuple2Arr) throws Exception {
        Await.result(this.cluster.leaderGateway().future(), DEADLINE.timeLeft());
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        JobSubmissionResult submitJobDetached = this.cluster.submitJobDetached(jobGraph);
        StandaloneClusterClient standaloneClusterClient = new StandaloneClusterClient(this.cluster.configuration());
        JobListeningContext connectToJob = standaloneClusterClient.connectToJob(submitJobDetached.getJobID());
        boolean z = false;
        while (true) {
            if (!DEADLINE.hasTimeLeft()) {
                break;
            }
            JobID jobID = submitJobDetached.getJobID();
            FiniteDuration apply = FiniteDuration.apply(5L, TimeUnit.SECONDS);
            try {
                Object result = Await.result(standaloneClusterClient.getJobManagerGateway().ask(JobManagerMessages.getRequestJobStatus(submitJobDetached.getJobID()), apply), apply);
                if ((result instanceof JobManagerMessages.CurrentJobStatus) && ((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
                    Assert.fail("Job failed: " + Await.result(connectToJob.getJobResultFuture(), Duration.apply(5L, TimeUnit.SECONDS)));
                }
            } catch (Exception e) {
                Assert.fail("Could not connect to job: " + e);
            }
            Thread.sleep(100L);
            Map accumulators = standaloneClusterClient.getAccumulators(jobID);
            boolean z2 = true;
            int length = tuple2Arr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Tuple2<String, Integer> tuple2 = tuple2Arr[i];
                Integer num = (Integer) accumulators.get(tuple2.f0);
                if (num == null) {
                    z2 = false;
                    break;
                } else {
                    if (!num.equals(tuple2.f1)) {
                        z2 = false;
                        break;
                    }
                    i++;
                }
            }
            if (z2) {
                z = true;
                break;
            }
        }
        if (z) {
            return;
        }
        Assert.fail("Did not see the expected accumulator results within time limit.");
    }
}
