package org.apache.flink.test.checkpointing.utils;

import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.class */
public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
    private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$AccumulatorCountingSink.class */
    public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
        private static final long serialVersionUID = 1;
        private final String accumulatorName;
        int count = 0;

        public AccumulatorCountingSink(String str) {
            this.accumulatorName = str;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(this.accumulatorName, new IntCounter());
        }

        public void invoke(T t) throws Exception {
            this.count++;
            getRuntimeContext().getAccumulator(this.accumulatorName).add(1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$CheckpointedUdfOperator.class */
    public static class CheckpointedUdfOperator extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;
        private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";

        public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMapFunction) {
            super(flatMapFunction);
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void processWatermark(Watermark watermark) throws Exception {
            this.output.emitWatermark(watermark);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$KeyedStateCheckingFlatMap.class */
    public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            ValueState state = getRuntimeContext().getState(this.stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + tuple2);
            }
            Assert.assertEquals(tuple2.f1, state.value());
            getRuntimeContext().getAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$KeyedStateSettingFlatMap.class */
    public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            getRuntimeContext().getState(this.stateDescriptor).update(tuple2.f1);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$LegacyCheckpointedFlatMap.class */
    public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> implements Checkpointed<Tuple2<String, Long>> {
        private static final long serialVersionUID = 1;
        public static Tuple2<String, Long> CHECKPOINTED_TUPLE = new Tuple2<>("hello", 42L);

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
        }

        public void restoreState(Tuple2<String, Long> tuple2) throws Exception {
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Tuple2<String, Long> m1086snapshotState(long j, long j2) throws Exception {
            return CHECKPOINTED_TUPLE;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$LegacyCheckpointedFlatMapWithKeyedState.class */
    public static class LegacyCheckpointedFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> implements Checkpointed<Tuple2<String, Long>> {
        private static final long serialVersionUID = 1;
        public static Tuple2<String, Long> CHECKPOINTED_TUPLE = new Tuple2<>("hello", 42L);
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            getRuntimeContext().getState(this.stateDescriptor).update(tuple2.f1);
        }

        public void restoreState(Tuple2<String, Long> tuple2) throws Exception {
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Tuple2<String, Long> m1088snapshotState(long j, long j2) throws Exception {
            return CHECKPOINTED_TUPLE;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$LegacyCheckpointedSource.class */
    private static class LegacyCheckpointedSource implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
        public static String CHECKPOINTED_STRING = "Here be dragons!";
        private static final long serialVersionUID = 1;
        private volatile boolean isRunning = true;
        private final int numElements;

        public LegacyCheckpointedSource(int i) {
            this.numElements = i;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            synchronized (sourceContext.getCheckpointLock()) {
                for (long j = 0; j < this.numElements; j++) {
                    sourceContext.collect(new Tuple2(Long.valueOf(j), Long.valueOf(j)));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void restoreState(String str) throws Exception {
            Assert.assertEquals(CHECKPOINTED_STRING, str);
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public String m1090snapshotState(long j, long j2) throws Exception {
            return CHECKPOINTED_STRING;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$RestoringCheckingFlatMap.class */
    public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> implements CheckpointedRestoring<Tuple2<String, Long>> {
        private static final long serialVersionUID = 1;
        private transient Tuple2<String, Long> restoredState;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            Assert.assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, this.restoredState);
            getRuntimeContext().getAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
        }

        public void restoreState(Tuple2<String, Long> tuple2) throws Exception {
            this.restoredState = tuple2;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$RestoringCheckingFlatMapWithKeyedState.class */
    public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> implements CheckpointedRestoring<Tuple2<String, Long>> {
        private static final long serialVersionUID = 1;
        private transient Tuple2<String, Long> restoredState;
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE);

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
        }

        public void flatMap(Tuple2<Long, Long> tuple2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
            ValueState state = getRuntimeContext().getState(this.stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + tuple2);
            }
            Assert.assertEquals(tuple2.f1, state.value());
            Assert.assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, this.restoredState);
            getRuntimeContext().getAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
        }

        public void restoreState(Tuple2<String, Long> tuple2) throws Exception {
            this.restoredState = tuple2;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Long, Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$RestoringCheckingSource.class */
    private static class RestoringCheckingSource extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedRestoring<String> {
        private static final long serialVersionUID = 1;
        private volatile boolean isRunning = true;
        private final int numElements;
        private String restoredState;

        public RestoringCheckingSource(int i) {
            this.numElements = i;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            Assert.assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, this.restoredState);
            getRuntimeContext().getAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
            synchronized (sourceContext.getCheckpointLock()) {
                for (long j = 0; j < this.numElements; j++) {
                    sourceContext.collect(new Tuple2(Long.valueOf(j), Long.valueOf(j)));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void restoreState(String str) throws Exception {
            this.restoredState = str;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase$RestoringCheckingUdfOperator.class */
    public static class RestoringCheckingUdfOperator extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;
        private String restoredState;

        public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMapFunction) {
            super(flatMapFunction);
        }

        public void open() throws Exception {
            super.open();
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> streamRecord) throws Exception {
            this.userFunction.flatMap(streamRecord.getValue(), new TimestampedCollector(this.output));
            Assert.assertEquals("Oh my, that's nice!", this.restoredState);
            getRuntimeContext().getAccumulator(StatefulJobSavepointFrom11MigrationITCase.SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
        }

        public void processWatermark(Watermark watermark) throws Exception {
            this.output.emitWatermark(watermark);
        }

        public void restoreState(FSDataInputStream fSDataInputStream) throws Exception {
            super.restoreState(fSDataInputStream);
            this.restoredState = new DataInputViewStreamWrapper(fSDataInputStream).readUTF();
        }
    }

    /* JADX WARN: Type inference failed for: r2v8, types: [org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase$1] */
    @Test
    @Ignore
    public void testCreateSavepointOnFlink11() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setStateBackend(new MemoryStateBackend());
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.setMaxParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource").flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap").keyBy(new int[]{0}).flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState").keyBy(new int[]{0}).flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap").keyBy(new int[]{0}).transform("custom_operator", new TypeHint<Tuple2<Long, Long>>() { // from class: org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase.1
        }.getTypeInfo(), new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator").addSink(new AccumulatorCountingSink(EXPECTED_ELEMENTS_ACCUMULATOR));
        executeAndSavepoint(executionEnvironment, "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint", new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, Integer.valueOf(NUM_SOURCE_ELEMENTS)));
    }

    /* JADX WARN: Type inference failed for: r2v8, types: [org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase$2] */
    @Test
    @Ignore
    public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.setMaxParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource").flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap").keyBy(new int[]{0}).flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState").keyBy(new int[]{0}).flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap").keyBy(new int[]{0}).transform("custom_operator", new TypeHint<Tuple2<Long, Long>>() { // from class: org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase.2
        }.getTypeInfo(), new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator").addSink(new AccumulatorCountingSink(EXPECTED_ELEMENTS_ACCUMULATOR));
        executeAndSavepoint(executionEnvironment, "src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint", new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, Integer.valueOf(NUM_SOURCE_ELEMENTS)));
    }

    /* JADX WARN: Type inference failed for: r2v8, types: [org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase$3] */
    @Test
    public void testSavepointRestoreFromFlink11() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setStateBackend(new MemoryStateBackend());
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.setMaxParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource").flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap").keyBy(new int[]{0}).flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState").keyBy(new int[]{0}).flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap").keyBy(new int[]{0}).transform("custom_operator", new TypeHint<Tuple2<Long, Long>>() { // from class: org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase.3
        }.getTypeInfo(), new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator").addSink(new AccumulatorCountingSink(EXPECTED_ELEMENTS_ACCUMULATOR));
        restoreAndExecute(executionEnvironment, getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"), new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, 21));
    }

    /* JADX WARN: Type inference failed for: r2v8, types: [org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase$4] */
    @Test
    public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.setMaxParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource").flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap").keyBy(new int[]{0}).flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState").keyBy(new int[]{0}).flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap").keyBy(new int[]{0}).transform("custom_operator", new TypeHint<Tuple2<Long, Long>>() { // from class: org.apache.flink.test.checkpointing.utils.StatefulJobSavepointFrom11MigrationITCase.4
        }.getTypeInfo(), new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator").addSink(new AccumulatorCountingSink(EXPECTED_ELEMENTS_ACCUMULATOR));
        restoreAndExecute(executionEnvironment, getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"), new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, 21));
    }
}
