package org.apache.flink.test.checkpointing.utils;

import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.hamcrest.Matchers;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/MigrationTestUtils.class */
public class MigrationTestUtils {

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/MigrationTestUtils$AccumulatorCountingSink.class */
    public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
        private static final long serialVersionUID = 1;
        public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
        int count = 0;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, new IntCounter());
        }

        public void invoke(T t, SinkFunction.Context context) throws Exception {
            this.count++;
            getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/MigrationTestUtils$CheckingNonParallelSourceWithListState.class */
    public static class CheckingNonParallelSourceWithListState extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {
        private static final long serialVersionUID = 1;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";
        private volatile boolean isRunning = true;
        private final int numElements;

        public CheckingNonParallelSourceWithListState(int i) {
            this.numElements = i;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            ListState listState = functionInitializationContext.getOperatorStateStore().getListState(CheckpointingNonParallelSourceWithListState.STATE_DESCRIPTOR);
            if (!functionInitializationContext.isRestored()) {
                throw new RuntimeException("This source should always be restored because it's only used when restoring from a savepoint.");
            }
            Assert.assertThat(listState.get(), Matchers.containsInAnyOrder(new String[]{"Here be dragons!", "Here be more dragons!", "Here be yet more dragons!", "Here be the mostest dragons!"}));
            getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            sourceContext.emitWatermark(new Watermark(1000L));
            synchronized (sourceContext.getCheckpointLock()) {
                for (long j = 0; j < this.numElements; j++) {
                    sourceContext.collect(new Tuple2(Long.valueOf(j), Long.valueOf(j)));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/MigrationTestUtils$CheckingParallelSourceWithUnionListState.class */
    public static class CheckingParallelSourceWithUnionListState extends RichParallelSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {
        private static final long serialVersionUID = 1;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";
        private volatile boolean isRunning = true;
        private final int numElements;

        public CheckingParallelSourceWithUnionListState(int i) {
            this.numElements = i;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            ListState unionListState = functionInitializationContext.getOperatorStateStore().getUnionListState(CheckpointingNonParallelSourceWithListState.STATE_DESCRIPTOR);
            if (!functionInitializationContext.isRestored()) {
                throw new RuntimeException("This source should always be restored because it's only used when restoring from a savepoint.");
            }
            Assert.assertThat(unionListState.get(), Matchers.containsInAnyOrder(CheckpointingParallelSourceWithUnionListState.CHECKPOINTED_STRINGS));
            getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            sourceContext.emitWatermark(new Watermark(1000L));
            synchronized (sourceContext.getCheckpointLock()) {
                for (long j = 0; j < this.numElements; j++) {
                    if (j % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) {
                        sourceContext.collect(new Tuple2(Long.valueOf(j), Long.valueOf(j)));
                    }
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/MigrationTestUtils$CheckpointingNonParallelSourceWithListState.class */
    public static class CheckpointingNonParallelSourceWithListState implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction {
        static final ListStateDescriptor<String> STATE_DESCRIPTOR = new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE);
        static final String CHECKPOINTED_STRING = "Here be dragons!";
        static final String CHECKPOINTED_STRING_1 = "Here be more dragons!";
        static final String CHECKPOINTED_STRING_2 = "Here be yet more dragons!";
        static final String CHECKPOINTED_STRING_3 = "Here be the mostest dragons!";
        private static final long serialVersionUID = 1;
        private volatile boolean isRunning = true;
        private final int numElements;
        private transient ListState<String> unionListState;

        public CheckpointingNonParallelSourceWithListState(int i) {
            this.numElements = i;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.unionListState.clear();
            this.unionListState.add(CHECKPOINTED_STRING);
            this.unionListState.add(CHECKPOINTED_STRING_1);
            this.unionListState.add(CHECKPOINTED_STRING_2);
            this.unionListState.add(CHECKPOINTED_STRING_3);
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.unionListState = functionInitializationContext.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            sourceContext.emitWatermark(new Watermark(0L));
            synchronized (sourceContext.getCheckpointLock()) {
                for (long j = 0; j < this.numElements; j++) {
                    sourceContext.collect(new Tuple2(Long.valueOf(j), Long.valueOf(j)));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/MigrationTestUtils$CheckpointingParallelSourceWithUnionListState.class */
    public static class CheckpointingParallelSourceWithUnionListState extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {
        static final ListStateDescriptor<String> STATE_DESCRIPTOR = new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE);
        static final String[] CHECKPOINTED_STRINGS = {"Here be dragons!", "Here be more dragons!", "Here be yet more dragons!", "Here be the mostest dragons!"};
        private static final long serialVersionUID = 1;
        private volatile boolean isRunning = true;
        private final int numElements;
        private transient ListState<String> unionListState;

        public CheckpointingParallelSourceWithUnionListState(int i) {
            this.numElements = i;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.unionListState.clear();
            for (String str : CHECKPOINTED_STRINGS) {
                if (str.hashCode() % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) {
                    this.unionListState.add(str);
                }
            }
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.unionListState = functionInitializationContext.getOperatorStateStore().getUnionListState(STATE_DESCRIPTOR);
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            sourceContext.emitWatermark(new Watermark(0L));
            synchronized (sourceContext.getCheckpointLock()) {
                for (long j = 0; j < this.numElements; j++) {
                    if (j % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) {
                        sourceContext.collect(new Tuple2(Long.valueOf(j), Long.valueOf(j)));
                    }
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}
