package org.apache.flink.test.state.operator.restore;

import java.io.File;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.class */
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {

    @Rule
    public final TemporaryFolder tmpFolder;
    private final boolean allowNonRestoredState;
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000);
    private static final int NUM_TMS = 1;
    private static final int NUM_SLOTS_PER_TM = 4;

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(new Configuration(), NUM_TMS, NUM_SLOTS_PER_TM), true);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$test$state$operator$restore$ExecutionMode = new int[ExecutionMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$test$state$operator$restore$ExecutionMode[ExecutionMode.MIGRATE.ordinal()] = AbstractOperatorRestoreTestBase.NUM_TMS;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$test$state$operator$restore$ExecutionMode[ExecutionMode.RESTORE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperatorRestoreTestBase() {
        this(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperatorRestoreTestBase(boolean z) {
        this.tmpFolder = new TemporaryFolder();
        this.allowNonRestoredState = z;
    }

    @BeforeClass
    public static void beforeClass() {
        SavepointSerializers.setFailWhenLegacyStateDetected(false);
    }

    @Test
    public void testMigrationAndRestore() throws Throwable {
        ClassLoader classLoader = getClass().getClassLoader();
        ClusterClient<?> clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
        clusterClient.setDetached(true);
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        restoreJob(classLoader, clusterClient, plus, migrateJob(classLoader, clusterClient, plus));
    }

    private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
        URL resource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
        if (resource == null) {
            throw new IllegalArgumentException("Savepoint file does not exist.");
        }
        JobGraph createJobGraph = createJobGraph(ExecutionMode.MIGRATE);
        createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(resource.getFile()));
        Assert.assertNotNull(createJobGraph.getJobID());
        clusterClient.submitJob(createJobGraph, classLoader);
        Assert.assertEquals(JobStatus.RUNNING, FutureUtils.retrySuccesfulWithDelay(() -> {
            return clusterClient.getJobStatus(createJobGraph.getJobID());
        }, Time.milliseconds(50L), deadline, jobStatus -> {
            return jobStatus == JobStatus.RUNNING;
        }, TestingUtils.defaultScheduledExecutor()).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        File newFolder = this.tmpFolder.newFolder();
        String str = null;
        while (deadline.hasTimeLeft() && str == null) {
            try {
                str = clusterClient.cancelWithSavepoint(createJobGraph.getJobID(), newFolder.getAbsolutePath());
            } catch (Exception e) {
                String stringifyException = ExceptionUtils.stringifyException(e);
                if (stringifyException.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*")) {
                    continue;
                } else if (stringifyException.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*")) {
                    continue;
                } else if (!stringifyException.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*")) {
                    throw e;
                }
            }
        }
        Assert.assertNotNull("Could not take savepoint.", str);
        Assert.assertEquals(JobStatus.CANCELED, FutureUtils.retrySuccesfulWithDelay(() -> {
            return clusterClient.getJobStatus(createJobGraph.getJobID());
        }, Time.milliseconds(50L), deadline, jobStatus2 -> {
            return jobStatus2 == JobStatus.CANCELED;
        }, TestingUtils.defaultScheduledExecutor()).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        return str;
    }

    private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String str) throws Exception {
        JobGraph createJobGraph = createJobGraph(ExecutionMode.RESTORE);
        createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, this.allowNonRestoredState));
        Assert.assertNotNull("Job doesn't have a JobID.", createJobGraph.getJobID());
        clusterClient.submitJob(createJobGraph, classLoader);
        Assert.assertEquals(JobStatus.FINISHED, FutureUtils.retrySuccesfulWithDelay(() -> {
            return clusterClient.getJobStatus(createJobGraph.getJobID());
        }, Time.milliseconds(50L), deadline, jobStatus -> {
            return jobStatus == JobStatus.FINISHED;
        }, TestingUtils.defaultScheduledExecutor()).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
    }

    private JobGraph createJobGraph(ExecutionMode executionMode) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.setStateBackend(new MemoryStateBackend());
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$test$state$operator$restore$ExecutionMode[executionMode.ordinal()]) {
            case NUM_TMS /* 1 */:
                createMigrationJob(executionEnvironment);
                break;
            case 2:
                createRestoredJob(executionEnvironment);
                break;
        }
        return StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
    }

    protected abstract void createMigrationJob(StreamExecutionEnvironment streamExecutionEnvironment);

    protected abstract void createRestoredJob(StreamExecutionEnvironment streamExecutionEnvironment);

    protected abstract String getMigrationSavepointName();
}
