package org.apache.flink.test.state.operator.restore;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.class */
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {

    @Rule
    public final TemporaryFolder tmpFolder;
    private static ActorSystem actorSystem = null;
    private static HighAvailabilityServices highAvailabilityServices = null;
    private static ActorGateway jobManager = null;
    private static ActorGateway archiver = null;
    private static ActorGateway taskManager = null;
    private static final FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
    private final boolean allowNonRestoredState;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperatorRestoreTestBase() {
        this(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperatorRestoreTestBase(boolean z) {
        this.tmpFolder = new TemporaryFolder();
        this.allowNonRestoredState = z;
    }

    @BeforeClass
    public static void beforeClass() {
        SavepointSerializers.setFailWhenLegacyStateDetected(false);
    }

    @BeforeClass
    public static void setupCluster() throws Exception {
        Configuration configuration = new Configuration();
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
        highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, TestingUtils.defaultExecutor());
        Tuple2 startJobManagerActors = JobManager.startJobManagerActors(configuration, actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, new NoOpMetricRegistry(), Option.empty(), Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
        jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), actorSystem, finiteDuration);
        archiver = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), jobManager.leaderSessionID());
        Configuration configuration2 = new Configuration();
        configuration2.setInteger("taskmanager.numberOfTaskSlots", 4);
        taskManager = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(configuration2, ResourceID.generate(), actorSystem, highAvailabilityServices, new NoOpMetricRegistry(), "localhost", Option.apply("tm"), true, TestingTaskManager.class), jobManager.leaderSessionID());
        Await.ready(taskManager.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()), finiteDuration), finiteDuration);
    }

    @AfterClass
    public static void tearDownCluster() throws Exception {
        if (highAvailabilityServices != null) {
            highAvailabilityServices.closeAndCleanupAllData();
        }
        if (actorSystem != null) {
            actorSystem.shutdown();
        }
        if (archiver != null) {
            archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
        }
        if (jobManager != null) {
            jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
        }
        if (taskManager != null) {
            taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
        }
    }

    @Test
    public void testMigrationAndRestore() throws Throwable {
        restoreJob(migrateJob());
    }

    private String migrateJob() throws Throwable {
        URL resource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
        if (resource == null) {
            throw new IllegalArgumentException("Savepoint file does not exist.");
        }
        JobGraph createJobGraph = createJobGraph(ExecutionMode.MIGRATE);
        createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(resource.getFile()));
        Object result = Await.result(jobManager.ask(new JobManagerMessages.SubmitJob(createJobGraph, ListeningBehaviour.DETACHED), timeout), timeout);
        if (result instanceof JobManagerMessages.JobResultFailure) {
            throw new Exception((Throwable) ((JobManagerMessages.JobResultFailure) result).cause());
        }
        Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
        Await.result(jobManager.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(createJobGraph.getJobID()), timeout), timeout);
        JobManagerMessages.CancelJobWithSavepoint cancelJobWithSavepoint = new JobManagerMessages.CancelJobWithSavepoint(createJobGraph.getJobID(), this.tmpFolder.newFolder().getAbsolutePath());
        boolean z = true;
        for (int i = 0; z && i < 10; i++) {
            result = Await.result(jobManager.ask(cancelJobWithSavepoint, timeout), timeout);
            if (result instanceof JobManagerMessages.CancellationFailure) {
                Thread.sleep(50L);
            } else {
                z = false;
            }
        }
        if (result instanceof JobManagerMessages.CancellationFailure) {
            throw new Exception(((JobManagerMessages.CancellationFailure) result).cause());
        }
        String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
        Await.ready(jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(createJobGraph.getJobID(), JobStatus.CANCELED), timeout), timeout);
        return savepointPath;
    }

    private void restoreJob(String str) throws Exception {
        JobGraph createJobGraph = createJobGraph(ExecutionMode.RESTORE);
        createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, this.allowNonRestoredState));
        Object result = Await.result(jobManager.ask(new JobManagerMessages.SubmitJob(createJobGraph, ListeningBehaviour.DETACHED), timeout), timeout);
        if (result instanceof JobManagerMessages.JobResultFailure) {
            throw new Exception((Throwable) ((JobManagerMessages.JobResultFailure) result).cause());
        }
        Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
        JobManagerMessages.RequestJobStatus requestJobStatus = new JobManagerMessages.RequestJobStatus(createJobGraph.getJobID());
        JobStatus status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(requestJobStatus, timeout), timeout)).status();
        while (true) {
            JobStatus jobStatus = status;
            if (jobStatus.isTerminalState()) {
                Assert.assertEquals(JobStatus.FINISHED, jobStatus);
                return;
            }
            status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(requestJobStatus, timeout), timeout)).status();
        }
    }

    private JobGraph createJobGraph(ExecutionMode executionMode) {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        createLocalEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        createLocalEnvironment.setStateBackend(new MemoryStateBackend());
        switch (executionMode) {
            case MIGRATE:
                createMigrationJob(createLocalEnvironment);
                break;
            case RESTORE:
                createRestoredJob(createLocalEnvironment);
                break;
        }
        return StreamingJobGraphGenerator.createJobGraph(createLocalEnvironment.getStreamGraph());
    }

    protected abstract void createMigrationJob(StreamExecutionEnvironment streamExecutionEnvironment);

    protected abstract void createRestoredJob(StreamExecutionEnvironment streamExecutionEnvironment);

    protected abstract String getMigrationSavepointName();
}
