/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ChangelogCompatibilityITCase {
    private final TestCase testCase;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private File checkpointDir;
    private File savepointDir;
    private MiniClusterWithClientResource miniClusterResource;

    @Parameterized.Parameters(name="{0}")
    public static List<TestCase> parameters() {
        return Arrays.asList(TestCase.startWithChangelog(true).restoreWithChangelog(false).from(RestoreSource.CANONICAL_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(false).from(RestoreSource.CHECKPOINT).allowRestore(true), TestCase.startWithChangelog(false).restoreWithChangelog(true).from(RestoreSource.CANONICAL_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(false).restoreWithChangelog(true).from(RestoreSource.CHECKPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(true).from(RestoreSource.CANONICAL_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(true).from(RestoreSource.NATIVE_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(false).restoreWithChangelog(true).from(RestoreSource.NATIVE_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(false).from(RestoreSource.NATIVE_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(true).from(RestoreSource.CHECKPOINT).allowRestore(true));
    }

    @Test
    public void testRestore() throws Exception {
        this.runAndStoreIfAllowed().ifPresent(this::restoreAndValidate);
    }

    private Optional<String> runAndStoreIfAllowed() throws Exception {
        JobGraph initialGraph = this.addGraph(this.initEnvironment());
        try {
            String location = this.tryCheckpointAndStop(initialGraph);
            if (!this.testCase.allowStore) {
                Assert.fail((String)this.testCase.describeStore());
            }
            return Optional.of(location);
        }
        catch (Exception e) {
            if (this.testCase.allowStore) {
                throw e;
            }
            return Optional.empty();
        }
    }

    private StreamExecutionEnvironment initEnvironment() {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)false);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.enableChangelogStateBackend(this.testCase.startWithChangelog);
        if (this.testCase.restoreSource == RestoreSource.CHECKPOINT) {
            env.enableCheckpointing(50L);
            env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        }
        return env;
    }

    private JobGraph addGraph(StreamExecutionEnvironment env) {
        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).countWindowAll(37L).reduce(Long::sum).sinkTo((Sink)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private String tryCheckpointAndStop(JobGraph jobGraph) throws Exception {
        ClusterClient client = this.miniClusterResource.getClusterClient();
        this.submit(jobGraph, client);
        if (this.testCase.restoreSource == RestoreSource.CHECKPOINT) {
            CommonTestUtils.waitForCheckpoint((JobID)jobGraph.getJobID(), (MiniCluster)this.miniClusterResource.getMiniCluster(), (int)1);
            client.cancel(jobGraph.getJobID()).get();
            return (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)jobGraph.getJobID(), (MiniCluster)this.miniClusterResource.getMiniCluster()).orElseThrow(() -> {
                throw new NoSuchElementException("No checkpoint was created yet");
            });
        }
        return (String)client.stopWithSavepoint(jobGraph.getJobID(), false, ChangelogCompatibilityITCase.pathToString(this.savepointDir), this.testCase.restoreSource == RestoreSource.CANONICAL_SAVEPOINT ? SavepointFormatType.CANONICAL : SavepointFormatType.NATIVE).get();
    }

    private void restoreAndValidate(String location) {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)false);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.enableChangelogStateBackend(this.testCase.restoreWithChangelog);
        JobGraph jobGraph = this.addGraph(env);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)location));
        if (this.tryRun(jobGraph) != this.testCase.allowRestore) {
            Assert.fail((String)this.testCase.describeRestore());
        }
    }

    private boolean tryRun(JobGraph jobGraph) {
        try {
            this.submit(jobGraph, this.miniClusterResource.getClusterClient());
            this.miniClusterResource.getClusterClient().cancel(jobGraph.getJobID()).get();
            return true;
        }
        catch (AssertionError | Exception e) {
            if (this.isValidationError((Throwable)e)) {
                return false;
            }
            throw new RuntimeException((Throwable)e);
        }
    }

    private boolean isValidationError(Throwable e) {
        return ExceptionUtils.findThrowableSerializedAware((Throwable)e, IllegalStateException.class, (ClassLoader)this.getClass().getClassLoader()).filter(i -> i.getMessage().toLowerCase().contains("recovery not supported")).isPresent();
    }

    private void submit(JobGraph jobGraph, ClusterClient<?> client) throws Exception {
        client.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)this.miniClusterResource.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)true);
    }

    private static String pathToString(File path) {
        return path.toURI().toString();
    }

    public ChangelogCompatibilityITCase(TestCase testCase) {
        this.testCase = testCase;
    }

    @Before
    public void before() throws Exception {
        this.checkpointDir = TEMPORARY_FOLDER.newFolder();
        this.savepointDir = TEMPORARY_FOLDER.newFolder();
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)ChangelogCompatibilityITCase.pathToString(this.checkpointDir));
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)ChangelogCompatibilityITCase.pathToString(this.savepointDir));
        FsStateChangelogStorageFactory.configure((Configuration)config, (File)TEMPORARY_FOLDER.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        this.miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(11).setNumberSlotsPerTaskManager(1).build());
        this.miniClusterResource.before();
    }

    @After
    public void after() {
        if (this.miniClusterResource != null) {
            this.miniClusterResource.after();
        }
    }

    private static enum RestoreSource {
        CANONICAL_SAVEPOINT,
        NATIVE_SAVEPOINT,
        CHECKPOINT;

    }

    private static final class TestCase {
        boolean startWithChangelog;
        boolean restoreWithChangelog;
        RestoreSource restoreSource;
        boolean allowStore = true;
        boolean allowRestore = false;

        private TestCase() {
        }

        public static TestCase startWithChangelog(boolean changelogEnabled) {
            TestCase testCase = new TestCase();
            testCase.startWithChangelog = changelogEnabled;
            return testCase;
        }

        public TestCase restoreWithChangelog(boolean restoreWithChangelog) {
            this.restoreWithChangelog = restoreWithChangelog;
            return this;
        }

        public TestCase from(RestoreSource restoreSource) {
            this.restoreSource = restoreSource;
            return this;
        }

        public TestCase allowRestore(boolean allowRestore) {
            this.allowRestore = allowRestore;
            return this;
        }

        public TestCase allowSave(boolean allowSave) {
            this.allowStore = allowSave;
            return this;
        }

        public String toString() {
            return String.format("startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowStore=%s, allowRestore=%s", new Object[]{this.startWithChangelog, this.restoreWithChangelog, this.restoreSource, this.allowStore, this.allowRestore});
        }

        private String describeStore() {
            return String.format("taking %s with changelog.enabled=%b should be %s", new Object[]{this.restoreSource, this.startWithChangelog, this.allowStore ? "allowed" : "disallowed"});
        }

        private String describeRestore() {
            return String.format("restoring from %s taken with changelog.enabled=%b should be %s with changelog.enabled=%b", new Object[]{this.restoreSource, this.allowRestore ? "allowed" : "disallowed", this.startWithChangelog, this.restoreWithChangelog});
        }
    }
}

