package org.apache.flink.test.state.operator.restore.unkeyed;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.class */
public class NonKeyedJob {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob$IntegerSource.class */
    public static final class IntegerSource extends RichParallelSourceFunction<Integer> {
        private static final long serialVersionUID = 1912878510707871659L;
        private final ExecutionMode mode;
        private volatile boolean running;

        private IntegerSource(ExecutionMode executionMode) {
            this.running = true;
            this.mode = executionMode;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            sourceContext.collect(1);
            switch (this.mode) {
                case GENERATE:
                case MIGRATE:
                    synchronized (this) {
                        while (this.running) {
                            wait();
                        }
                    }
                    return;
                default:
                    return;
            }
        }

        public void cancel() {
            synchronized (this) {
                this.running = false;
                notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob$NoOpMapFunction.class */
    public static class NoOpMapFunction implements MapFunction<Integer, Integer> {
        private static final long serialVersionUID = 6584823409744624276L;

        private NoOpMapFunction() {
        }

        public Integer map(Integer num) throws Exception {
            return num;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob$StatefulStringStoringMap.class */
    public static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
        private static final long serialVersionUID = 6092985758425330235L;
        private final ExecutionMode mode;
        private final String valueToStore;

        private StatefulStringStoringMap(ExecutionMode executionMode, String str) {
            this.mode = executionMode;
            this.valueToStore = str;
        }

        public Integer map(Integer num) throws Exception {
            return num;
        }

        public List<String> snapshotState(long j, long j2) throws Exception {
            return Arrays.asList(this.valueToStore + getRuntimeContext().getIndexOfThisSubtask());
        }

        public void restoreState(List<String> list) throws Exception {
            switch (this.mode) {
                case GENERATE:
                default:
                    return;
                case MIGRATE:
                case RESTORE:
                    Assert.assertEquals("Failed for " + this.valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1L, list.size());
                    Assert.assertEquals(this.valueToStore + getRuntimeContext().getIndexOfThisSubtask(), list.get(0));
                    return;
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        String required = ParameterTool.fromArgs(strArr).getRequired("savepoint-path");
        Configuration configuration = new Configuration();
        configuration.setString("state.savepoints.dir", required);
        StreamExecutionEnvironment createLocalEnvironmentWithWebUI = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        createLocalEnvironmentWithWebUI.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        createLocalEnvironmentWithWebUI.setRestartStrategy(RestartStrategies.noRestart());
        createLocalEnvironmentWithWebUI.setStateBackend(new MemoryStateBackend());
        SingleOutputStreamOperator<Integer> createFirstStatefulMap = createFirstStatefulMap(ExecutionMode.GENERATE, createSource(createLocalEnvironmentWithWebUI, ExecutionMode.GENERATE));
        createFirstStatefulMap.startNewChain();
        SingleOutputStreamOperator<Integer> createSecondStatefulMap = createSecondStatefulMap(ExecutionMode.GENERATE, createFirstStatefulMap);
        createSecondStatefulMap.startNewChain();
        createThirdStatefulMap(ExecutionMode.GENERATE, createStatelessMap(createSecondStatefulMap));
        createLocalEnvironmentWithWebUI.execute("job");
    }

    public static SingleOutputStreamOperator<Integer> createSource(StreamExecutionEnvironment streamExecutionEnvironment, ExecutionMode executionMode) {
        return streamExecutionEnvironment.addSource(new IntegerSource(executionMode)).setParallelism(4);
    }

    public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode executionMode, DataStream<Integer> dataStream) {
        return dataStream.map(new StatefulStringStoringMap(executionMode, "first")).setParallelism(4).uid("first");
    }

    public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode executionMode, DataStream<Integer> dataStream) {
        return dataStream.map(new StatefulStringStoringMap(executionMode, "second")).setParallelism(4).uid("second");
    }

    public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode executionMode, DataStream<Integer> dataStream) {
        SingleOutputStreamOperator<Integer> parallelism = dataStream.map(new StatefulStringStoringMap(executionMode, "third")).setParallelism(4);
        if (executionMode == ExecutionMode.MIGRATE || executionMode == ExecutionMode.RESTORE) {
            parallelism.uid("third");
        }
        return parallelism;
    }

    public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> dataStream) {
        return dataStream.map(new NoOpMapFunction()).setParallelism(4);
    }

    private NonKeyedJob() {
    }
}
