package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.class */
public class ChangelogLocalRecoveryITCase extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_TASK_SLOTS = 1;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Parameterized.Parameter
    public AbstractStateBackend delegatedStateBackend;
    private MiniClusterWithClientResource cluster;
    private static String workingDir;

    @Parameterized.Parameters(name = "delegated state backend type = {0}")
    public static Collection<AbstractStateBackend> parameter() {
        return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend(false), new EmbeddedRocksDBStateBackend(true));
    }

    @BeforeClass
    public static void setWorkingDir() throws IOException {
        workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath();
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, Integer.valueOf(NUM_TASK_SLOTS));
        configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, workingDir);
        configuration.set(ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE, workingDir);
        configuration.set(ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE, workingDir);
        configuration.set(StateRecoveryOptions.LOCAL_RECOVERY, true);
        FsStateChangelogStorageFactory.configure(configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMillis(1000L), NUM_TASK_SLOTS);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(NUM_TASK_MANAGERS).setNumberSlotsPerTaskManager(NUM_TASK_SLOTS).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @After
    public void teardown() {
        this.cluster.after();
    }

    private JobGraph buildJobGraph(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.addSource(new InfiniteIntegerSource()).setParallelism(NUM_TASK_SLOTS).keyBy(num -> {
            return num;
        }).process(new ChangelogRecoveryITCaseBase.CountFunction()).addSink(new ChangelogRecoveryITCaseBase.CollectionSink()).setParallelism(NUM_TASK_SLOTS);
        return streamExecutionEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    public void testRestartTM() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        MiniCluster miniCluster = this.cluster.getMiniCluster();
        JobGraph buildJobGraph = buildJobGraph(getEnv(this.delegatedStateBackend, newFolder, true, 200L, 800L));
        miniCluster.submitJob(buildJobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(miniCluster, buildJobGraph.getJobID(), false);
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(!ChangelogRecoveryITCaseBase.getAllStateHandleId(buildJobGraph.getJobID(), miniCluster).isEmpty());
        });
        miniCluster.terminateTaskManager(NUM_TASK_SLOTS).get();
        miniCluster.startTaskManager();
        CommonTestUtils.waitForAllTaskRunning(() -> {
            return (AccessExecutionGraph) miniCluster.getExecutionGraph(buildJobGraph.getJobID()).get(500L, TimeUnit.SECONDS);
        }, false);
        miniCluster.triggerCheckpoint(buildJobGraph.getJobID());
    }

    private StreamExecutionEnvironment getEnv(StateBackend stateBackend, File file, boolean z, long j, long j2) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(j);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(false);
        executionEnvironment.setStateBackend(stateBackend).setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10L));
        executionEnvironment.configure(new Configuration().set(StateRecoveryOptions.LOCAL_RECOVERY, true));
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(file.toURI());
        executionEnvironment.enableChangelogStateBackend(z);
        executionEnvironment.configure(new Configuration().set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofMillis(j2)).set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, Integer.valueOf(NUM_TASK_SLOTS)));
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, Integer.valueOf(NUM_TASK_SLOTS));
        executionEnvironment.configure(configuration);
        return executionEnvironment;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -512986253:
                if (implMethodName.equals("lambda$buildJobGraph$47a9adf$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
