package org.apache.flink.test.state;

import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/state/ChangelogCompatibilityITCase.class */
public class ChangelogCompatibilityITCase {
    private final TestCase testCase;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private File checkpointDir;
    private File savepointDir;
    private MiniClusterWithClientResource miniClusterResource;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/ChangelogCompatibilityITCase$RestoreSource.class */
    public enum RestoreSource {
        CANONICAL_SAVEPOINT,
        NATIVE_SAVEPOINT,
        CHECKPOINT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/ChangelogCompatibilityITCase$TestCase.class */
    public static final class TestCase {
        boolean startWithChangelog;
        boolean restoreWithChangelog;
        RestoreSource restoreSource;
        boolean allowStore = true;
        boolean allowRestore = false;

        private TestCase() {
        }

        public static TestCase startWithChangelog(boolean z) {
            TestCase testCase = new TestCase();
            testCase.startWithChangelog = z;
            return testCase;
        }

        public TestCase restoreWithChangelog(boolean z) {
            this.restoreWithChangelog = z;
            return this;
        }

        public TestCase from(RestoreSource restoreSource) {
            this.restoreSource = restoreSource;
            return this;
        }

        public TestCase allowRestore(boolean z) {
            this.allowRestore = z;
            return this;
        }

        public TestCase allowSave(boolean z) {
            this.allowStore = z;
            return this;
        }

        public String toString() {
            return String.format("startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowStore=%s, allowRestore=%s", Boolean.valueOf(this.startWithChangelog), Boolean.valueOf(this.restoreWithChangelog), this.restoreSource, Boolean.valueOf(this.allowStore), Boolean.valueOf(this.allowRestore));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String describeStore() {
            Object[] objArr = new Object[3];
            objArr[0] = this.restoreSource;
            objArr[1] = Boolean.valueOf(this.startWithChangelog);
            objArr[2] = this.allowStore ? "allowed" : "disallowed";
            return String.format("taking %s with changelog.enabled=%b should be %s", objArr);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String describeRestore() {
            Object[] objArr = new Object[4];
            objArr[0] = this.restoreSource;
            objArr[1] = this.allowRestore ? "allowed" : "disallowed";
            objArr[2] = Boolean.valueOf(this.startWithChangelog);
            objArr[3] = Boolean.valueOf(this.restoreWithChangelog);
            return String.format("restoring from %s taken with changelog.enabled=%b should be %s with changelog.enabled=%b", objArr);
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static List<TestCase> parameters() {
        return Arrays.asList(TestCase.startWithChangelog(true).restoreWithChangelog(false).from(RestoreSource.CANONICAL_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(false).from(RestoreSource.CHECKPOINT).allowRestore(true), TestCase.startWithChangelog(false).restoreWithChangelog(true).from(RestoreSource.CANONICAL_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(false).restoreWithChangelog(true).from(RestoreSource.CHECKPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(true).from(RestoreSource.CANONICAL_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(true).from(RestoreSource.NATIVE_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(false).restoreWithChangelog(true).from(RestoreSource.NATIVE_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(false).from(RestoreSource.NATIVE_SAVEPOINT).allowRestore(true), TestCase.startWithChangelog(true).restoreWithChangelog(true).from(RestoreSource.CHECKPOINT).allowRestore(true));
    }

    @Test
    public void testRestore() throws Exception {
        runAndStoreIfAllowed().ifPresent(this::restoreAndValidate);
    }

    private Optional<String> runAndStoreIfAllowed() throws Exception {
        try {
            String tryCheckpointAndStop = tryCheckpointAndStop(addGraph(initEnvironment()));
            if (!this.testCase.allowStore) {
                Assert.fail(this.testCase.describeStore());
            }
            return Optional.of(tryCheckpointAndStop);
        } catch (Exception e) {
            if (this.testCase.allowStore) {
                throw e;
            }
            return Optional.empty();
        }
    }

    private StreamExecutionEnvironment initEnvironment() {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.enableChangelogStateBackend(this.testCase.startWithChangelog);
        if (this.testCase.restoreSource == RestoreSource.CHECKPOINT) {
            executionEnvironment.enableCheckpointing(50L);
            executionEnvironment.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        }
        return executionEnvironment;
    }

    private JobGraph addGraph(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).countWindowAll(37L).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).sinkTo(new DiscardingSink());
        return streamExecutionEnvironment.getStreamGraph().getJobGraph();
    }

    private String tryCheckpointAndStop(JobGraph jobGraph) throws Exception {
        ClusterClient<?> clusterClient = this.miniClusterResource.getClusterClient();
        submit(jobGraph, clusterClient);
        if (this.testCase.restoreSource != RestoreSource.CHECKPOINT) {
            return (String) clusterClient.stopWithSavepoint(jobGraph.getJobID(), false, pathToString(this.savepointDir), this.testCase.restoreSource == RestoreSource.CANONICAL_SAVEPOINT ? SavepointFormatType.CANONICAL : SavepointFormatType.NATIVE).get();
        }
        CommonTestUtils.waitForCheckpoint(jobGraph.getJobID(), this.miniClusterResource.getMiniCluster(), 1);
        clusterClient.cancel(jobGraph.getJobID()).get();
        return (String) CommonTestUtils.getLatestCompletedCheckpointPath(jobGraph.getJobID(), this.miniClusterResource.getMiniCluster()).orElseThrow(() -> {
            throw new NoSuchElementException("No checkpoint was created yet");
        });
    }

    private void restoreAndValidate(String str) {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.enableChangelogStateBackend(this.testCase.restoreWithChangelog);
        JobGraph addGraph = addGraph(executionEnvironment);
        addGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        if (tryRun(addGraph) != this.testCase.allowRestore) {
            Assert.fail(this.testCase.describeRestore());
        }
    }

    private boolean tryRun(JobGraph jobGraph) {
        try {
            submit(jobGraph, this.miniClusterResource.getClusterClient());
            this.miniClusterResource.getClusterClient().cancel(jobGraph.getJobID()).get();
            return true;
        } catch (AssertionError | Exception e) {
            if (isValidationError(e)) {
                return false;
            }
            throw new RuntimeException(e);
        }
    }

    private boolean isValidationError(Throwable th) {
        return ExceptionUtils.findThrowableSerializedAware(th, IllegalStateException.class, getClass().getClassLoader()).filter(illegalStateException -> {
            return illegalStateException.getMessage().toLowerCase().contains("recovery not supported");
        }).isPresent();
    }

    private void submit(JobGraph jobGraph, ClusterClient<?> clusterClient) throws Exception {
        clusterClient.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(this.miniClusterResource.getMiniCluster(), jobGraph.getJobID(), true);
    }

    private static String pathToString(File file) {
        return file.toURI().toString();
    }

    public ChangelogCompatibilityITCase(TestCase testCase) {
        this.testCase = testCase;
    }

    @Before
    public void before() throws Exception {
        this.checkpointDir = TEMPORARY_FOLDER.newFolder();
        this.savepointDir = TEMPORARY_FOLDER.newFolder();
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, pathToString(this.checkpointDir));
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, pathToString(this.savepointDir));
        FsStateChangelogStorageFactory.configure(configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMinutes(1L), 10);
        this.miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(11).setNumberSlotsPerTaskManager(1).build());
        this.miniClusterResource.before();
    }

    @After
    public void after() {
        if (this.miniClusterResource != null) {
            this.miniClusterResource.after();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 114251:
                if (implMethodName.equals("sum")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("(JJ)J")) {
                    return (v0, v1) -> {
                        return Long.sum(v0, v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
