/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.time.Duration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase;
import org.apache.flink.test.checkpointing.ChangelogRecoverySwitchEnvTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Before;
import org.junit.Test;

public class ChangelogRecoverySwitchStateBackendITCase
extends ChangelogRecoverySwitchEnvTestBase {
    public ChangelogRecoverySwitchStateBackendITCase(AbstractStateBackend delegatedStateBackend) {
        super(delegatedStateBackend);
    }

    @Override
    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, (Object)1);
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.parse((String)"20b"));
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)TEMPORARY_FOLDER.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @Test
    public void testSwitchFromEnablingToDisabling() throws Exception {
        this.testSwitchEnv((StateBackend)this.delegatedStateBackend, this.getEnv(true), this.getEnv(false));
    }

    @Test
    public void testSwitchFromEnablingToDisablingWithRescalingOut() throws Exception {
        this.testSwitchEnv((StateBackend)this.delegatedStateBackend, this.getEnv(true, 2), this.getEnv(false, 4));
    }

    @Test
    public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception {
        this.testSwitchEnv((StateBackend)this.delegatedStateBackend, this.getEnv(true, 4), this.getEnv(false, 2));
    }

    @Test
    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        MiniCluster miniCluster = this.cluster.getMiniCluster();
        StreamExecutionEnvironment env1 = this.getEnv(firstCheckpointFolder, false, 100L, -1L);
        SharedReference miniClusterRef = this.sharedObjects.add((Object)miniCluster);
        JobGraph firstJobGraph = this.buildJobGraph((StateBackend)this.delegatedStateBackend, env1, 2000, 2500, (SharedReference<MiniCluster>)miniClusterRef);
        try {
            miniCluster.submitJob(firstJobGraph).get();
            miniCluster.requestJobResult(firstJobGraph.getJobID()).get();
        }
        catch (Exception ex) {
            Preconditions.checkState((boolean)ExceptionUtils.findThrowable((Throwable)ex, ChangelogRecoveryITCaseBase.ArtificialFailure.class).isPresent());
        }
        String firstRestorePath = (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)firstJobGraph.getJobID(), (MiniCluster)miniCluster).get();
        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        StreamExecutionEnvironment env2 = this.getEnv(secondCheckpointFolder, true, 100L, -1L);
        JobGraph secondJobGraph = this.buildJobGraph((StateBackend)this.delegatedStateBackend, env2, 3333, 5000, (SharedReference<MiniCluster>)miniClusterRef);
        this.setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
        try {
            miniCluster.submitJob(secondJobGraph).get();
            miniCluster.requestJobResult(secondJobGraph.getJobID()).get();
        }
        catch (Exception ex) {
            Preconditions.checkState((boolean)ExceptionUtils.findThrowable((Throwable)ex, ChangelogRecoveryITCaseBase.ArtificialFailure.class).isPresent());
        }
        String secondRestorePath = (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)secondJobGraph.getJobID(), (MiniCluster)miniCluster).get();
        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        StreamExecutionEnvironment env3 = this.getEnv(thirdCheckpointFolder, true, 100L, 1000L);
        JobGraph thirdJobGraph = this.buildJobGraph((StateBackend)this.delegatedStateBackend, env3, 10000, 6666, (SharedReference<MiniCluster>)miniClusterRef);
        this.setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
        miniCluster.submitJob(thirdJobGraph).get();
        miniCluster.requestJobResult(thirdJobGraph.getJobID()).get();
    }

    private StreamExecutionEnvironment getEnv(boolean enableChangelog) {
        return this.getEnv(enableChangelog, 4);
    }

    private StreamExecutionEnvironment getEnv(boolean enableChangelog, int parallelism) {
        StreamExecutionEnvironment env = this.getEnv(100L, 0, 500L, 0);
        env.enableChangelogStateBackend(enableChangelog);
        env.setParallelism(parallelism);
        env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        return env;
    }

    private StreamExecutionEnvironment getEnv(File checkpointFile, boolean changelogEnabled, long checkpointInterval, long materializationInterval) {
        StreamExecutionEnvironment env = this.getEnv(checkpointFile, checkpointInterval, 0, materializationInterval, 0);
        env.enableChangelogStateBackend(changelogEnabled);
        env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        return env;
    }

    private void setSavepointRestoreSettings(JobGraph jobGraph, String restorePath) {
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)restorePath, (boolean)false, (RecoveryClaimMode)RecoveryClaimMode.CLAIM));
    }
}

