/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;

public class StreamCheckpointNotifierITCase
extends StreamFaultToleranceTestBase {
    final long NUM_LONGS = 10000000L;

    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        DataStreamSource stream = env.addSource((SourceFunction)new GeneratingSourceFunction(10000000L));
        stream.filter((FilterFunction)new LongRichFilterFunction()).connect((DataStream)stream).flatMap((CoFlatMapFunction)new LeftIdentityCoRichFlatMapFunction()).map((MapFunction)new IdentityMapFunction()).startNewChain().keyBy(new int[]{0}).reduce((ReduceFunction)new OnceFailingReducer(10000000L)).addSink((SinkFunction)new SinkFunction<Tuple1<Long>>(){

            public void invoke(Tuple1<Long> value) {
            }
        });
    }

    @Override
    public void postSubmit() {
        List[][] checkList = new List[][]{GeneratingSourceFunction.completedCheckpoints, IdentityMapFunction.completedCheckpoints, LongRichFilterFunction.completedCheckpoints, LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
        long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
        for (List[] parallelNotifications : checkList) {
            for (int i = 0; i < 6; ++i) {
                List notifications = parallelNotifications[i];
                Assert.assertTrue((String)"No checkpoint notification was received.", (notifications.size() > 0 ? 1 : 0) != 0);
                Assert.assertFalse((String)"Failure checkpoint was marked as completed.", (boolean)notifications.contains(failureCheckpointID));
                Assert.assertFalse((String)"No checkpoint received before failure.", ((Long)notifications.get(0) == failureCheckpointID ? 1 : 0) != 0);
                Assert.assertFalse((String)"No checkpoint received after failure.", ((Long)notifications.get(notifications.size() - 1) == failureCheckpointID ? 1 : 0) != 0);
                Assert.assertTrue((String)"Checkpoint notification was received multiple times", (notifications.size() == new HashSet(notifications).size() ? 1 : 0) != 0);
            }
        }
    }

    private static class LeftIdentityCoRichFlatMapFunction
    extends RichCoFlatMapFunction<Long, Long, Long>
    implements CheckpointNotifier {
        public static List<Long>[] completedCheckpoints = new List[6];
        private int subtaskId;

        private LeftIdentityCoRichFlatMapFunction() {
        }

        public void open(Configuration conf) throws IOException {
            this.subtaskId = this.getRuntimeContext().getIndexOfThisSubtask();
            if (completedCheckpoints[this.subtaskId] == null) {
                LeftIdentityCoRichFlatMapFunction.completedCheckpoints[this.subtaskId] = new ArrayList<Long>();
            }
        }

        public void flatMap1(Long value, Collector<Long> out) throws IOException {
            out.collect((Object)value);
        }

        public void flatMap2(Long value, Collector<Long> out) throws IOException {
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            completedCheckpoints[this.subtaskId].add(checkpointId);
        }
    }

    private static class LongRichFilterFunction
    extends RichFilterFunction<Long>
    implements CheckpointNotifier {
        static List<Long>[] completedCheckpoints = new List[6];
        private int subtaskId;

        private LongRichFilterFunction() {
        }

        public boolean filter(Long value) {
            return value < 100L;
        }

        public void open(Configuration conf) throws IOException {
            this.subtaskId = this.getRuntimeContext().getIndexOfThisSubtask();
            if (completedCheckpoints[this.subtaskId] == null) {
                LongRichFilterFunction.completedCheckpoints[this.subtaskId] = new ArrayList<Long>();
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            completedCheckpoints[this.subtaskId].add(checkpointId);
        }
    }

    private static class OnceFailingReducer
    extends RichReduceFunction<Tuple1<Long>>
    implements Checkpointed<Long> {
        private static volatile boolean hasFailed = false;
        public static volatile long failureCheckpointID;
        private final long numElements;
        private long failurePos;
        private long count;

        OnceFailingReducer(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) {
            long failurePosMin = (long)(0.4 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            long failurePosMax = (long)(0.7 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = new Random().nextLong() % (failurePosMax - failurePosMin) + failurePosMin;
            this.count = 0L;
        }

        public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) throws Exception {
            ++this.count;
            Tuple1<Long> tuple1 = value1;
            tuple1.f0 = (Long)tuple1.f0 + (Long)value2.f0;
            return value1;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            if (!hasFailed && this.count >= this.failurePos) {
                hasFailed = true;
                failureCheckpointID = checkpointId;
                throw new Exception("Test Failure");
            }
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    private static class IdentityMapFunction
    extends RichMapFunction<Long, Tuple1<Long>>
    implements CheckpointNotifier {
        public static List<Long>[] completedCheckpoints = new List[6];
        private int subtaskId;

        private IdentityMapFunction() {
        }

        public Tuple1<Long> map(Long value) throws Exception {
            return Tuple1.of((Object)value);
        }

        public void open(Configuration conf) throws IOException {
            this.subtaskId = this.getRuntimeContext().getIndexOfThisSubtask();
            if (completedCheckpoints[this.subtaskId] == null) {
                IdentityMapFunction.completedCheckpoints[this.subtaskId] = new ArrayList<Long>();
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            completedCheckpoints[this.subtaskId].add(checkpointId);
        }
    }

    private static class GeneratingSourceFunction
    extends RichSourceFunction<Long>
    implements ParallelSourceFunction<Long>,
    CheckpointNotifier,
    Checkpointed<Integer> {
        static List<Long>[] completedCheckpoints = new List[6];
        private final long numElements;
        private long result;
        private int index;
        private int step;
        private int subtaskId;
        private volatile boolean isRunning = true;

        GeneratingSourceFunction(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) throws IOException {
            this.step = this.getRuntimeContext().getNumberOfParallelSubtasks();
            this.subtaskId = this.getRuntimeContext().getIndexOfThisSubtask();
            if (this.index == 0) {
                this.index = this.subtaskId;
            }
            if (completedCheckpoints[this.subtaskId] == null) {
                GeneratingSourceFunction.completedCheckpoints[this.subtaskId] = new ArrayList<Long>();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            Object lockingObject = ctx.getCheckpointLock();
            while (this.isRunning && (long)this.index < this.numElements) {
                this.result = this.index % 10;
                Object object = lockingObject;
                synchronized (object) {
                    this.index += this.step;
                    ctx.collect((Object)this.result);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            completedCheckpoints[this.subtaskId].add(checkpointId);
        }

        public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.index;
        }

        public void restoreState(Integer state) {
            this.index = state;
        }
    }
}

