/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateCheckpointedITCase
extends StreamFaultToleranceTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class);
    final long NUM_STRINGS = 10000000L;

    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        Assert.assertTrue((String)"Broken test setup", (boolean)true);
        long failurePosMin = 666666L;
        long failurePosMax = 1166666L;
        long failurePos = new Random().nextLong() % 500000L + 666666L;
        env.enableCheckpointing(200L);
        DataStreamSource stream = env.addSource((SourceFunction)new StringGeneratingSourceFunction(10000000L));
        stream.filter((FilterFunction)new StringRichFilterFunction()).map((MapFunction)new StringPrefixCountRichMapFunction()).startNewChain().map((MapFunction)new StatefulCounterFunction()).partitionByHash(new String[]{"prefix"}).flatMap((FlatMapFunction)new OnceFailingAggregator(failurePos)).addSink((SinkFunction)new ValidatingSink());
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public void postSubmit() {
        void var11_10;
        if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
            LOG.warn("Test inconclusive: failure occurred before first checkpoint");
        }
        long filterSum = 0L;
        for (long l : StringRichFilterFunction.counts) {
            filterSum += l;
        }
        long mapSum = 0L;
        long[] lArray = StringPrefixCountRichMapFunction.counts;
        int n = lArray.length;
        boolean bl = false;
        while (var11_10 < n) {
            long l = lArray[var11_10];
            mapSum += l;
            ++var11_10;
        }
        long countSum = 0L;
        for (long l : StatefulCounterFunction.counts) {
            countSum += l;
        }
        Assert.assertEquals((long)10000000L, (long)filterSum);
        Assert.assertEquals((long)10000000L, (long)mapSum);
        Assert.assertEquals((long)10000000L, (long)countSum);
        for (Map map : ValidatingSink.maps) {
            for (Long count : map.values()) {
                Assert.assertEquals((long)250000L, (long)count);
            }
        }
    }

    private static class ValidatingSink
    extends RichSinkFunction<StreamFaultToleranceTestBase.PrefixCount>
    implements Checkpointed<HashMap<Character, Long>> {
        private static Map<Character, Long>[] maps = new Map[6];
        private HashMap<Character, Long> counts = new HashMap();

        private ValidatingSink() {
        }

        public void invoke(StreamFaultToleranceTestBase.PrefixCount value) {
            Character first = Character.valueOf(value.prefix.charAt(0));
            Long previous = this.counts.get(first);
            if (previous == null) {
                this.counts.put(first, value.count);
            } else {
                this.counts.put(first, Math.max(previous, value.count));
            }
        }

        public void close() throws Exception {
            ValidatingSink.maps[this.getRuntimeContext().getIndexOfThisSubtask()] = this.counts;
        }

        public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.counts;
        }

        public void restoreState(HashMap<Character, Long> state) {
            this.counts.putAll(state);
        }
    }

    private static class OnceFailingAggregator
    extends RichFlatMapFunction<StreamFaultToleranceTestBase.PrefixCount, StreamFaultToleranceTestBase.PrefixCount>
    implements Checkpointed<HashMap<String, StreamFaultToleranceTestBase.PrefixCount>>,
    CheckpointNotifier {
        static boolean wasCheckpointedBeforeFailure = false;
        private static volatile boolean hasFailed = false;
        private final HashMap<String, StreamFaultToleranceTestBase.PrefixCount> aggregationMap = new HashMap();
        private long failurePos;
        private long count;
        private boolean wasCheckpointed;

        OnceFailingAggregator(long failurePos) {
            this.failurePos = failurePos;
        }

        public void open(Configuration parameters) {
            this.count = 0L;
        }

        public void flatMap(StreamFaultToleranceTestBase.PrefixCount value, Collector<StreamFaultToleranceTestBase.PrefixCount> out) throws Exception {
            ++this.count;
            if (!hasFailed && this.count >= this.failurePos && this.getRuntimeContext().getIndexOfThisSubtask() == 1) {
                wasCheckpointedBeforeFailure = this.wasCheckpointed;
                hasFailed = true;
                throw new Exception("Test Failure");
            }
            StreamFaultToleranceTestBase.PrefixCount curr = this.aggregationMap.get(value.prefix);
            if (curr == null) {
                this.aggregationMap.put(value.prefix, value);
                out.collect((Object)value);
            } else {
                curr.count += value.count;
                out.collect((Object)curr);
            }
        }

        public HashMap<String, StreamFaultToleranceTestBase.PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.aggregationMap;
        }

        public void restoreState(HashMap<String, StreamFaultToleranceTestBase.PrefixCount> state) {
            this.aggregationMap.putAll(state);
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.wasCheckpointed = true;
        }
    }

    private static class StatefulCounterFunction
    extends RichMapFunction<StreamFaultToleranceTestBase.PrefixCount, StreamFaultToleranceTestBase.PrefixCount>
    implements Checkpointed<Long> {
        static final long[] counts = new long[6];
        private long count;

        private StatefulCounterFunction() {
        }

        public StreamFaultToleranceTestBase.PrefixCount map(StreamFaultToleranceTestBase.PrefixCount value) throws Exception {
            ++this.count;
            return value;
        }

        public void close() throws IOException {
            StatefulCounterFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    private static class StringPrefixCountRichMapFunction
    extends RichMapFunction<String, StreamFaultToleranceTestBase.PrefixCount>
    implements CheckpointedAsynchronously<Long> {
        static final long[] counts = new long[6];
        private long count;

        private StringPrefixCountRichMapFunction() {
        }

        public StreamFaultToleranceTestBase.PrefixCount map(String value) {
            ++this.count;
            return new StreamFaultToleranceTestBase.PrefixCount(value.substring(0, 1), value, 1L);
        }

        public void close() {
            StringPrefixCountRichMapFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    private static class StringRichFilterFunction
    extends RichFilterFunction<String>
    implements Checkpointed<Long> {
        static final long[] counts = new long[6];
        private long count;

        private StringRichFilterFunction() {
        }

        public boolean filter(String value) throws Exception {
            ++this.count;
            return value.length() < 100;
        }

        public void close() {
            StringRichFilterFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    private static class StringGeneratingSourceFunction
    extends RichParallelSourceFunction<String>
    implements CheckpointedAsynchronously<Integer> {
        private final long numElements;
        private int index;
        private volatile boolean isRunning = true;

        StringGeneratingSourceFunction(long numElements) {
            this.numElements = numElements;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object lockingObject = ctx.getCheckpointLock();
            Random rnd = new Random();
            StringBuilder stringBuilder = new StringBuilder();
            int step = this.getRuntimeContext().getNumberOfParallelSubtasks();
            if (this.index == 0) {
                this.index = this.getRuntimeContext().getIndexOfThisSubtask();
            }
            while (this.isRunning && (long)this.index < this.numElements) {
                char first = (char)(this.index % 40 + 40);
                stringBuilder.setLength(0);
                stringBuilder.append(first);
                String result = StringGeneratingSourceFunction.randomString(stringBuilder, rnd);
                Object object = lockingObject;
                synchronized (object) {
                    this.index += step;
                    ctx.collect((Object)result);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        private static String randomString(StringBuilder bld, Random rnd) {
            int len = rnd.nextInt(10) + 5;
            for (int i = 0; i < len; ++i) {
                char next = (char)(rnd.nextInt(20000) + 33);
                bld.append(next);
            }
            return bld.toString();
        }

        public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.index;
        }

        public void restoreState(Integer state) {
            this.index = state;
        }
    }
}

