package org.apache.flink.test.checkpointing;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointFormatITCase.class */
public class SavepointFormatITCase {

    @TempDir
    Path checkpointsDir;

    @TempDir
    Path originalSavepointDir;

    @TempDir
    Path renamedSavepointDir;

    @RegisterExtension
    LoggerAuditingExtension loggerAuditingExtension = new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointFormatITCase$StateBackendConfig.class */
    public static abstract class StateBackendConfig {
        protected final boolean changelogEnabled;
        protected final boolean incremental;
        private static final List<BiFunction<Boolean, Boolean, StateBackendConfig>> builders = Arrays.asList((z, z2) -> {
            return SavepointFormatITCase.getRocksdb(z, z2);
        }, (z3, z4) -> {
            return SavepointFormatITCase.heap(z3, z4);
        });

        protected StateBackendConfig(boolean z, boolean z2) {
            this.changelogEnabled = z;
            this.incremental = z2;
        }

        public abstract String getName();

        public Configuration getConfiguration() {
            Configuration configuration = new Configuration();
            configuration.setString(StateBackendOptions.STATE_BACKEND, getConfigName());
            configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, Boolean.valueOf(this.incremental));
            configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, Boolean.valueOf(this.changelogEnabled));
            return configuration;
        }

        public int getCheckpointsBeforeSavepoint() {
            return 0;
        }

        protected abstract String getConfigName();

        public final String toString() {
            return String.format("%s, incremental: %b, changelog: %b", getName(), Boolean.valueOf(this.incremental), Boolean.valueOf(this.changelogEnabled));
        }

        public abstract boolean isIncremental();

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isChangelogEnabled() {
            return this.changelogEnabled;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointFormatITCase$StatefulCounter.class */
    public static final class StatefulCounter extends RichMapFunction<Long, Long> {
        private ValueState<Long> counter;

        private StatefulCounter() {
        }

        public void open(Configuration configuration) throws Exception {
            this.counter = getRuntimeContext().getState(new ValueStateDescriptor("counter", BasicTypeInfo.LONG_TYPE_INFO));
        }

        public Long map(Long l) throws Exception {
            this.counter.update(Long.valueOf(((Long) Optional.ofNullable(this.counter.value()).orElse(0L)).longValue() + l.longValue()));
            return (Long) this.counter.value();
        }
    }

    private static List<Arguments> parameters() {
        LinkedList linkedList = new LinkedList();
        for (BiFunction biFunction : StateBackendConfig.builders) {
            for (boolean z : new boolean[]{true, false}) {
                for (boolean z2 : new boolean[]{true, false}) {
                    for (SavepointFormatType savepointFormatType : SavepointFormatType.values()) {
                        if (!z2 || savepointFormatType != SavepointFormatType.NATIVE) {
                            linkedList.add(Arguments.of(new Object[]{savepointFormatType, biFunction.apply(Boolean.valueOf(z), Boolean.valueOf(z2))}));
                        }
                    }
                }
            }
        }
        return linkedList;
    }

    private void validateState(KeyedStateHandle keyedStateHandle, SavepointFormatType savepointFormatType, StateBackendConfig stateBackendConfig) {
        if (savepointFormatType == SavepointFormatType.CANONICAL) {
            MatcherAssert.assertThat(keyedStateHandle, CoreMatchers.instanceOf(SavepointKeyedStateHandle.class));
            return;
        }
        if (!stateBackendConfig.isChangelogEnabled()) {
            validateNativeNonChangelogState(keyedStateHandle, stateBackendConfig);
            return;
        }
        MatcherAssert.assertThat(keyedStateHandle, CoreMatchers.instanceOf(ChangelogStateBackendHandle.class));
        Iterator it = ((ChangelogStateBackendHandle) keyedStateHandle).getMaterializedStateHandles().iterator();
        while (it.hasNext()) {
            validateNativeNonChangelogState((KeyedStateHandle) it.next(), stateBackendConfig);
        }
    }

    private void validateNativeNonChangelogState(KeyedStateHandle keyedStateHandle, StateBackendConfig stateBackendConfig) {
        if (stateBackendConfig.isIncremental()) {
            MatcherAssert.assertThat(keyedStateHandle, CoreMatchers.instanceOf(IncrementalRemoteKeyedStateHandle.class));
        } else {
            MatcherAssert.assertThat(keyedStateHandle, CoreMatchers.instanceOf(KeyGroupsStateHandle.class));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StateBackendConfig heap(boolean z, boolean z2) {
        return new StateBackendConfig(z2, z) { // from class: org.apache.flink.test.checkpointing.SavepointFormatITCase.1
            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public String getName() {
                return "HEAP";
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public Configuration getConfiguration() {
                Configuration configuration = super.getConfiguration();
                configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
                return configuration;
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            protected String getConfigName() {
                return "filesystem";
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public boolean isIncremental() {
                return false;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StateBackendConfig getRocksdb(boolean z, boolean z2) {
        return new StateBackendConfig(z2, z) { // from class: org.apache.flink.test.checkpointing.SavepointFormatITCase.2
            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public String getName() {
                return "ROCKSDB";
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public int getCheckpointsBeforeSavepoint() {
                return 1;
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public boolean isIncremental() {
                return this.incremental;
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            public Configuration getConfiguration() {
                Configuration configuration = super.getConfiguration();
                configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
                return configuration;
            }

            @Override // org.apache.flink.test.checkpointing.SavepointFormatITCase.StateBackendConfig
            protected String getConfigName() {
                return "rocksdb";
            }
        };
    }

    @MethodSource({"parameters"})
    @ParameterizedTest(name = "[{index}] {0}, {1}")
    public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(SavepointFormatType savepointFormatType, StateBackendConfig stateBackendConfig) throws Exception {
        Configuration configuration = stateBackendConfig.getConfiguration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.checkpointsDir.toUri().toString());
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        miniClusterWithClientResource.before();
        try {
            String submitJobAndTakeSavepoint = submitJobAndTakeSavepoint(miniClusterWithClientResource, savepointFormatType, stateBackendConfig.getCheckpointsBeforeSavepoint(), configuration);
            ((OperatorState) TestUtils.loadCheckpointMetadata(submitJobAndTakeSavepoint).getOperatorStates().stream().filter(hasKeyedState()).findFirst().get()).getStates().stream().flatMap(operatorSubtaskState -> {
                return operatorSubtaskState.getManagedKeyedState().stream();
            }).forEach(keyedStateHandle -> {
                validateState(keyedStateHandle, savepointFormatType, stateBackendConfig);
            });
            relocateAndVerify(miniClusterWithClientResource, submitJobAndTakeSavepoint, this.renamedSavepointDir, configuration);
            miniClusterWithClientResource.after();
        } catch (Throwable th) {
            miniClusterWithClientResource.after();
            throw th;
        }
    }

    @NotNull
    private Predicate<OperatorState> hasKeyedState() {
        return operatorState -> {
            return operatorState.hasSubtaskStates() && ((Boolean) operatorState.getStates().stream().findFirst().map(operatorSubtaskState -> {
                return Boolean.valueOf(operatorSubtaskState.getManagedKeyedState().hasState());
            }).orElse(false)).booleanValue();
        };
    }

    private void relocateAndVerify(MiniClusterWithClientResource miniClusterWithClientResource, String str, Path path, Configuration configuration) throws Exception {
        new org.apache.flink.core.fs.Path(str).getFileSystem().rename(new org.apache.flink.core.fs.Path(str), new org.apache.flink.core.fs.Path(path.toUri().toString()));
        JobGraph createJobGraph = createJobGraph(configuration);
        createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(path.toUri().toString(), false, RestoreMode.CLAIM));
        JobID jobID = createJobGraph.getJobID();
        miniClusterWithClientResource.getClusterClient().submitJob(createJobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(miniClusterWithClientResource.getMiniCluster(), jobID, false);
    }

    private String submitJobAndTakeSavepoint(MiniClusterWithClientResource miniClusterWithClientResource, SavepointFormatType savepointFormatType, int i, Configuration configuration) throws Exception {
        JobGraph createJobGraph = createJobGraph(configuration);
        JobID jobID = createJobGraph.getJobID();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        clusterClient.submitJob(createJobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(miniClusterWithClientResource.getMiniCluster(), jobID, false);
        for (int i2 = 0; i2 < i; i2++) {
            miniClusterWithClientResource.getMiniCluster().triggerCheckpoint(jobID).get();
        }
        return (String) clusterClient.stopWithSavepoint(jobID, false, this.originalSavepointDir.toUri().toString(), savepointFormatType).get();
    }

    private static JobGraph createJobGraph(Configuration configuration) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(4);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).keyBy(l -> {
            return Long.valueOf(l.longValue() % 1000);
        }).map(new StatefulCounter()).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1012430706:
                if (implMethodName.equals("lambda$createJobGraph$b29523fd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/SavepointFormatITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return Long.valueOf(l.longValue() % 1000);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
