/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state.operator.restore;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.nio.file.CopyOption;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.streaming.util.StateBackendUtils;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.apache.flink.test.util.MigrationTest;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

public abstract class AbstractOperatorRestoreTestBase
extends TestLogger
implements MigrationTest {
    private final FlinkVersion flinkVersion;
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final int NUM_TMS = 1;
    private static final int NUM_SLOTS_PER_TM = 4;
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
    private static final Pattern PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern.compile(Stream.of("was not running", CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message(), CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY.message(), CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER.message()).map(AbstractOperatorRestoreTestBase::escapeRegexCharacters).collect(Collectors.joining(")|(", "(", ")")));
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());

    @Parameterized.Parameters(name="Migrate Savepoint: {0}")
    public static Collection<FlinkVersion> parameters() {
        return FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_8, (FlinkVersion)MigrationTest.getMostRecentlyPublishedVersion());
    }

    protected AbstractOperatorRestoreTestBase(FlinkVersion flinkVersion) {
        this.flinkVersion = flinkVersion;
    }

    protected void internalGenerateSnapshots(FlinkVersion targetVersion) throws Exception {
        ClusterClient clusterClient = this.cluster.getClusterClient();
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        String savepointPath = this.migrateJob(FlinkVersion.v1_8, clusterClient, deadline);
        Path targetPath = this.getSavepointPath(targetVersion);
        Files.createDirectories(targetPath, new FileAttribute[0]);
        try (DirectoryStream<Path> childrenFiles = Files.newDirectoryStream(Paths.get(new URI(savepointPath)));){
            for (Path filePath : childrenFiles) {
                Files.copy(filePath, Paths.get(targetPath.toString(), filePath.getFileName().toString()), new CopyOption[0]);
            }
        }
    }

    @Test
    public void testMigrationAndRestore() throws Throwable {
        ClusterClient clusterClient = this.cluster.getClusterClient();
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        String savepointPath = this.migrateJob(this.flinkVersion, clusterClient, deadline);
        this.restoreJob(clusterClient, deadline, savepointPath);
    }

    private String migrateJob(FlinkVersion flinkVersion, ClusterClient<?> clusterClient, Deadline deadline) throws Exception {
        URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + this.getMigrationSavepointName(flinkVersion));
        if (savepointResource == null) {
            throw new IllegalArgumentException("Savepoint file does not exist.");
        }
        JobGraph jobToMigrate = this.createJobGraph(ExecutionMode.MIGRATE);
        jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointResource.getFile()));
        Assert.assertNotNull((Object)jobToMigrate.getJobID());
        clusterClient.submitJob(jobToMigrate).get();
        CompletableFuture jobRunningFuture = FutureUtils.retrySuccessfulWithDelay(() -> clusterClient.getJobStatus(jobToMigrate.getJobID()), (Duration)Duration.ofMillis(50L), (Deadline)deadline, jobStatus -> jobStatus == JobStatus.RUNNING, (ScheduledExecutor)this.scheduledExecutor);
        Assert.assertEquals((Object)JobStatus.RUNNING, jobRunningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        File targetDirectory = this.tmpFolder.newFolder();
        String savepointPath = null;
        while (deadline.hasTimeLeft() && savepointPath == null) {
            try {
                savepointPath = (String)clusterClient.cancelWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
            }
            catch (Exception e) {
                String exceptionString = ExceptionUtils.stringifyException((Throwable)e);
                if (PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS.matcher(exceptionString).find()) continue;
                throw e;
            }
        }
        Assert.assertNotNull((String)"Could not take savepoint.", savepointPath);
        CompletableFuture jobCanceledFuture = FutureUtils.retrySuccessfulWithDelay(() -> clusterClient.getJobStatus(jobToMigrate.getJobID()), (Duration)Duration.ofMillis(50L), (Deadline)deadline, jobStatus -> jobStatus == JobStatus.CANCELED, (ScheduledExecutor)this.scheduledExecutor);
        Assert.assertEquals((Object)JobStatus.CANCELED, jobCanceledFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        return savepointPath;
    }

    private void restoreJob(ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
        JobGraph jobToRestore = this.createJobGraph(ExecutionMode.RESTORE);
        jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)true));
        Assert.assertNotNull((String)"Job doesn't have a JobID.", (Object)jobToRestore.getJobID());
        clusterClient.submitJob(jobToRestore).get();
        CompletableFuture jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(() -> clusterClient.getJobStatus(jobToRestore.getJobID()), (Duration)Duration.ofMillis(50L), (Deadline)deadline, jobStatus -> jobStatus == JobStatus.FINISHED, (ScheduledExecutor)this.scheduledExecutor);
        Assert.assertEquals((Object)JobStatus.FINISHED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
    }

    private JobGraph createJobGraph(ExecutionMode mode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        RestartStrategyUtils.configureNoRestartStrategy((StreamExecutionEnvironment)env);
        StateBackendUtils.configureHashMapStateBackend((StreamExecutionEnvironment)env);
        CheckpointStorageUtils.configureJobManagerCheckpointStorage((StreamExecutionEnvironment)env);
        switch (mode) {
            case MIGRATE: {
                this.createMigrationJob(env);
                break;
            }
            case RESTORE: {
                this.createRestoredJob(env);
            }
        }
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private Path getSavepointPath(FlinkVersion version) {
        return Paths.get(System.getProperty("user.dir"), "src/test/resources/operatorstate/" + this.getMigrationSavepointName(version));
    }

    protected abstract void createMigrationJob(StreamExecutionEnvironment var1);

    protected abstract void createRestoredJob(StreamExecutionEnvironment var1);

    protected abstract String getMigrationSavepointName(FlinkVersion var1);

    private static String escapeRegexCharacters(String string) {
        return string.replaceAll("\\(", "\\\\(").replaceAll("\\)", "\\\\)");
    }
}

