package org.apache.flink.test.checkpointing;

import java.util.Collection;
import java.util.LinkedList;
import java.util.stream.Collectors;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.class */
public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
    private static final SnapshotMigrationTestBase.ExecutionMode executionMode = SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT;
    private final SnapshotMigrationTestBase.SnapshotSpec snapshotSpec;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase$CheckingKeyedStateFlatMap.class */
    private static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;
        static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK";
        private final ValueStateDescriptor<Long> stateDescriptor;

        private CheckingKeyedStateFlatMap() {
            this.stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            ValueState state = getRuntimeContext().getState(this.stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + tuple2);
            }
            Assert.assertEquals(tuple2.f1, state.value());
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase$CheckingTimelyStatefulOperator.class */
    private static class CheckingTimelyStatefulOperator extends AbstractStreamOperator<Tuple2<Long, Long>> implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
        private static final long serialVersionUID = 1;
        static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS";
        static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS";
        static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS";
        private final ValueStateDescriptor<Long> stateDescriptor;

        private CheckingTimelyStatefulOperator() {
            this.stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);
        }

        public void open() throws Exception {
            super.open();
            getInternalTimerService("timer", LongSerializer.INSTANCE, this);
            getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter());
            getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter());
            getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter());
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> streamRecord) throws Exception {
            Assert.assertEquals(getKeyedStateBackend().getPartitionedState(((Tuple2) streamRecord.getValue()).f0, LongSerializer.INSTANCE, this.stateDescriptor).value(), ((Tuple2) streamRecord.getValue()).f1);
            getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1);
            this.output.collect(streamRecord);
        }

        public void onEventTime(InternalTimer<Long, Long> internalTimer) throws Exception {
            Assert.assertEquals(getKeyedStateBackend().getPartitionedState(internalTimer.getNamespace(), LongSerializer.INSTANCE, this.stateDescriptor).value(), internalTimer.getNamespace());
            getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
        }

        public void onProcessingTime(InternalTimer<Long, Long> internalTimer) throws Exception {
            Assert.assertEquals(getKeyedStateBackend().getPartitionedState(internalTimer.getNamespace(), LongSerializer.INSTANCE, this.stateDescriptor).value(), internalTimer.getNamespace());
            getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase$CheckpointingKeyedStateFlatMap.class */
    private static class CheckpointingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<Long> stateDescriptor;

        private CheckpointingKeyedStateFlatMap() {
            this.stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);
        }

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            getRuntimeContext().getState(this.stateDescriptor).update(tuple2.f1);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase$CheckpointingTimelyStatefulOperator.class */
    private static class CheckpointingTimelyStatefulOperator extends AbstractStreamOperator<Tuple2<Long, Long>> implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<Long> stateDescriptor;
        private transient InternalTimerService<Long> timerService;

        private CheckpointingTimelyStatefulOperator() {
            this.stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("timer", LongSerializer.INSTANCE, this);
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> streamRecord) throws Exception {
            getKeyedStateBackend().getPartitionedState(((Tuple2) streamRecord.getValue()).f0, LongSerializer.INSTANCE, this.stateDescriptor).update(((Tuple2) streamRecord.getValue()).f1);
            this.timerService.registerEventTimeTimer(((Tuple2) streamRecord.getValue()).f0, this.timerService.currentWatermark() + 10);
            this.timerService.registerProcessingTimeTimer(((Tuple2) streamRecord.getValue()).f0, this.timerService.currentProcessingTime() + 30000);
            this.output.collect(streamRecord);
        }

        public void onEventTime(InternalTimer<Long, Long> internalTimer) {
        }

        public void onProcessingTime(InternalTimer<Long, Long> internalTimer) {
        }

        public void processWatermark(Watermark watermark) {
            this.output.emitWatermark(watermark);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [java.util.Collection] */
    @Parameterized.Parameters(name = "Test snapshot: {0}")
    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> parameters() {
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("jobmanager", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf(FlinkVersion.v1_4, FlinkVersion.v1_14)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf(FlinkVersion.v1_4, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_NATIVE, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_NATIVE, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.CHECKPOINT, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.CHECKPOINT, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            linkedList = (Collection) linkedList.stream().filter(snapshotSpec -> {
                return snapshotSpec.getFlinkVersion().equals(currentVersion);
            }).collect(Collectors.toList());
        }
        return linkedList;
    }

    public StatefulJobSnapshotMigrationITCase(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) throws Exception {
        this.snapshotSpec = snapshotSpec;
    }

    /* JADX WARN: Type inference failed for: r2v12, types: [org.apache.flink.test.checkpointing.StatefulJobSnapshotMigrationITCase$1] */
    /* JADX WARN: Type inference failed for: r2v17, types: [org.apache.flink.test.checkpointing.StatefulJobSnapshotMigrationITCase$2] */
    @Test
    public void testSavepoint() throws Exception {
        SourceFunction checkingNonParallelSourceWithListState;
        SourceFunction checkingParallelSourceWithUnionListState;
        FlatMapFunction checkingKeyedStateFlatMap;
        Triggerable checkingTimelyStatefulOperator;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        String stateBackendType = this.snapshotSpec.getStateBackendType();
        boolean z = -1;
        switch (stateBackendType.hashCode()) {
            case 697541006:
                if (stateBackendType.equals("hashmap")) {
                    z = 2;
                    break;
                }
                break;
            case 1368770220:
                if (stateBackendType.equals("rocksdb")) {
                    z = false;
                    break;
                }
                break;
            case 1712403792:
                if (stateBackendType.equals("jobmanager")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend());
                break;
            case true:
                executionEnvironment.setStateBackend(new MemoryStateBackend());
                break;
            case true:
                executionEnvironment.setStateBackend(new HashMapStateBackend());
                break;
            default:
                throw new UnsupportedOperationException();
        }
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.setMaxParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.enableChangelogStateBackend(false);
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            checkingNonParallelSourceWithListState = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
            checkingParallelSourceWithUnionListState = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
            checkingKeyedStateFlatMap = new CheckpointingKeyedStateFlatMap();
            checkingTimelyStatefulOperator = new CheckpointingTimelyStatefulOperator();
        } else {
            if (executionMode != SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT) {
                throw new IllegalStateException("Unknown ExecutionMode " + executionMode);
            }
            checkingNonParallelSourceWithListState = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
            checkingParallelSourceWithUnionListState = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
            checkingKeyedStateFlatMap = new CheckingKeyedStateFlatMap();
            checkingTimelyStatefulOperator = new CheckingTimelyStatefulOperator();
        }
        executionEnvironment.addSource(checkingNonParallelSourceWithListState).uid("CheckpointingSource1").keyBy(new int[]{0}).flatMap(checkingKeyedStateFlatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap1").keyBy(new int[]{0}).transform("timely_stateful_operator", new TypeHint<Tuple2<Long, Long>>() { // from class: org.apache.flink.test.checkpointing.StatefulJobSnapshotMigrationITCase.1
        }.getTypeInfo(), checkingTimelyStatefulOperator).uid("CheckpointingTimelyStatefulOperator1").addSink(new MigrationTestUtils.AccumulatorCountingSink());
        executionEnvironment.addSource(checkingParallelSourceWithUnionListState).uid("CheckpointingSource2").keyBy(new int[]{0}).flatMap(checkingKeyedStateFlatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap2").keyBy(new int[]{0}).transform("timely_stateful_operator", new TypeHint<Tuple2<Long, Long>>() { // from class: org.apache.flink.test.checkpointing.StatefulJobSnapshotMigrationITCase.2
        }.getTypeInfo(), checkingTimelyStatefulOperator).uid("CheckpointingTimelyStatefulOperator2").addSink(new MigrationTestUtils.AccumulatorCountingSink());
        String snapshotPath = getSnapshotPath(this.snapshotSpec);
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            executeAndSnapshot(executionEnvironment, "src/test/resources/" + snapshotPath, this.snapshotSpec.getSnapshotType(), new Tuple2<>(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 8));
        } else {
            restoreAndExecute(executionEnvironment, getResourceFilename(snapshotPath), new Tuple2<>(MigrationTestUtils.CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), new Tuple2<>(MigrationTestUtils.CheckingParallelSourceWithUnionListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, Integer.valueOf(NUM_SOURCE_ELEMENTS)), new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 8), new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, 8), new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, 8), new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, 8), new Tuple2<>(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 8));
        }
    }

    private static String getSnapshotPath(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        return "new-stateful-udf-migration-itcase-" + snapshotSpec;
    }
}
