package org.apache.flink.test.state.operator.restore.keyed;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.apache.flink.util.Collector;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/state/operator/restore/keyed/KeyedJob.class */
public class KeyedJob {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/keyed/KeyedJob$IntegerTupleSource.class */
    public static final class IntegerTupleSource extends RichSourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1912878510707871659L;
        private final ExecutionMode mode;
        private boolean running;

        private IntegerTupleSource(ExecutionMode executionMode) {
            this.running = true;
            this.mode = executionMode;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            for (int i = 0; i < 10; i++) {
                sourceContext.collect(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
            }
            switch (this.mode) {
                case GENERATE:
                case MIGRATE:
                    synchronized (this) {
                        while (this.running) {
                            wait();
                        }
                    }
                    return;
                default:
                    return;
            }
        }

        public void cancel() {
            synchronized (this) {
                this.running = false;
                notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/keyed/KeyedJob$StatefulStringStoringMap.class */
    public static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
        private static final long serialVersionUID = 6092985758425330235L;
        private final ExecutionMode mode;
        private final String valueToStore;

        private StatefulStringStoringMap(ExecutionMode executionMode, String str) {
            this.mode = executionMode;
            this.valueToStore = str;
        }

        public Integer map(Integer num) throws Exception {
            return num;
        }

        public List<String> snapshotState(long j, long j2) throws Exception {
            return Arrays.asList(this.valueToStore + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
        }

        public void restoreState(List<String> list) throws Exception {
            switch (this.mode) {
                case GENERATE:
                default:
                    return;
                case MIGRATE:
                case RESTORE:
                    Assert.assertEquals("Failed for " + this.valueToStore + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), 1L, list.size());
                    Assert.assertEquals(this.valueToStore + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), list.get(0));
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/operator/restore/keyed/KeyedJob$StatefulWindowFunction.class */
    public static final class StatefulWindowFunction extends RichWindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, GlobalWindow> {
        private static final long serialVersionUID = -7236313076792964055L;
        private final ExecutionMode mode;
        private transient ListState<Integer> state;
        private boolean applyCalled;

        private StatefulWindowFunction(ExecutionMode executionMode) {
            this.applyCalled = false;
            this.mode = executionMode;
        }

        public void open(OpenContext openContext) {
            this.state = getRuntimeContext().getListState(new ListStateDescriptor("values", Integer.class));
        }

        public void apply(Tuple tuple, GlobalWindow globalWindow, Iterable<Tuple2<Integer, Integer>> iterable, Collector<Integer> collector) throws Exception {
            this.applyCalled = true;
            switch (this.mode) {
                case GENERATE:
                    Iterator<Tuple2<Integer, Integer>> it = iterable.iterator();
                    while (it.hasNext()) {
                        this.state.add(it.next().f1);
                    }
                    return;
                case MIGRATE:
                case RESTORE:
                    Iterator<Tuple2<Integer, Integer>> it2 = iterable.iterator();
                    Iterator it3 = ((Iterable) this.state.get()).iterator();
                    while (it2.hasNext() && it3.hasNext()) {
                        Assert.assertEquals((Integer) it3.next(), it2.next().f1);
                    }
                    Assert.assertEquals(Boolean.valueOf(it3.hasNext()), Boolean.valueOf(it2.hasNext()));
                    return;
                default:
                    return;
            }
        }

        public void close() {
            Assert.assertTrue("Apply was never called.", this.applyCalled);
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Tuple) obj, (GlobalWindow) window, (Iterable<Tuple2<Integer, Integer>>) iterable, (Collector<Integer>) collector);
        }
    }

    public static void main(String[] strArr) throws Exception {
        String required = ParameterTool.fromArgs(strArr).getRequired("savepoint-path");
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, required);
        StreamExecutionEnvironment createLocalEnvironmentWithWebUI = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        createLocalEnvironmentWithWebUI.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        createLocalEnvironmentWithWebUI.setRestartStrategy(RestartStrategies.noRestart());
        createLocalEnvironmentWithWebUI.setStateBackend(new MemoryStateBackend());
        createSecondStatefulMap(ExecutionMode.GENERATE, createFirstStatefulMap(ExecutionMode.GENERATE, createWindowFunction(ExecutionMode.GENERATE, createIntegerTupleSource(createLocalEnvironmentWithWebUI, ExecutionMode.GENERATE))));
        createLocalEnvironmentWithWebUI.execute("job");
    }

    public static SingleOutputStreamOperator<Tuple2<Integer, Integer>> createIntegerTupleSource(StreamExecutionEnvironment streamExecutionEnvironment, ExecutionMode executionMode) {
        return streamExecutionEnvironment.addSource(new IntegerTupleSource(executionMode));
    }

    public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode executionMode, DataStream<Tuple2<Integer, Integer>> dataStream) {
        return dataStream.keyBy(new int[]{0}).countWindow(1L).apply(new StatefulWindowFunction(executionMode)).setParallelism(4).uid("window");
    }

    public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode executionMode, DataStream<Integer> dataStream) {
        return dataStream.map(new StatefulStringStoringMap(executionMode, "first")).setParallelism(4).uid("first");
    }

    public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode executionMode, DataStream<Integer> dataStream) {
        return dataStream.map(new StatefulStringStoringMap(executionMode, "second")).setParallelism(4).uid("second");
    }
}
