/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class StreamCheckpointingITCase {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_TASK_SLOTS = 3;
    private static final int PARALLELISM = 6;
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration config = new Configuration();
            config.setInteger("localinstancemanager.numtaskmanager", 2);
            config.setInteger("taskmanager.numberOfTaskSlots", 3);
            config.setString("execution-retries.delay", "0 ms");
            config.setInteger("taskmanager.memory.size", 12);
            cluster = new ForkableFlinkMiniCluster(config, false);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to start test cluster: " + e.getMessage()));
        }
    }

    @AfterClass
    public static void shutdownCluster() {
        try {
            cluster.shutdown();
            cluster = null;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to stop test cluster: " + e.getMessage()));
        }
    }

    @Test
    public void runCheckpointedProgram() {
        long NUM_STRINGS = 10000000L;
        Assert.assertTrue((String)"Broken test setup", (boolean)true);
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getJobManagerRPCPort(), (String[])new String[0]);
            env.setParallelism(6);
            env.enableCheckpointing(500L);
            env.getConfig().disableSysoutLogging();
            DataStreamSource stream = env.addSource((SourceFunction)new StringGeneratingSourceFunction(10000000L));
            stream.filter((FilterFunction)new StringRichFilterFunction()).map((MapFunction)new StringPrefixCountRichMapFunction()).startNewChain().map((MapFunction)new StatefulCounterFunction()).groupBy(new String[]{"prefix"}).reduce((ReduceFunction)new OnceFailingReducer(10000000L)).addSink((SinkFunction)new RichSinkFunction<PrefixCount>(){
                private Map<Character, Long> counts = new HashMap<Character, Long>();

                public void invoke(PrefixCount value) {
                    Character first = Character.valueOf(value.prefix.charAt(0));
                    Long previous = this.counts.get(first);
                    if (previous == null) {
                        this.counts.put(first, value.count);
                    } else {
                        this.counts.put(first, Math.max(previous, value.count));
                    }
                }
            });
            env.execute();
            long filterSum = 0L;
            for (long l : StringRichFilterFunction.counts) {
                filterSum += l;
            }
            long mapSum = 0L;
            for (long l : StringPrefixCountRichMapFunction.counts) {
                mapSum += l;
            }
            long countSum = 0L;
            for (long l : StatefulCounterFunction.counts) {
                countSum += l;
            }
            Assert.assertEquals((long)10000000L, (long)filterSum);
            Assert.assertEquals((long)10000000L, (long)mapSum);
            Assert.assertEquals((long)10000000L, (long)countSum);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class StringPrefixCountRichMapFunction
    extends RichMapFunction<String, PrefixCount>
    implements Checkpointed<Long> {
        static final long[] counts = new long[6];
        private long count = 0L;

        private StringPrefixCountRichMapFunction() {
        }

        public PrefixCount map(String value) {
            ++this.count;
            return new PrefixCount(value.substring(0, 1), value, 1L);
        }

        public void close() {
            StringPrefixCountRichMapFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    private static class StringRichFilterFunction
    extends RichFilterFunction<String>
    implements Checkpointed<Long> {
        static final long[] counts = new long[6];
        private long count = 0L;

        private StringRichFilterFunction() {
        }

        public boolean filter(String value) {
            ++this.count;
            return value.length() < 100;
        }

        public void close() {
            StringRichFilterFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    public static class PrefixCount {
        public String prefix;
        public String value;
        public long count;

        public PrefixCount() {
        }

        public PrefixCount(String prefix, String value, long count) {
            this.prefix = prefix;
            this.value = value;
            this.count = count;
        }

        public String toString() {
            return this.prefix + " / " + this.value;
        }
    }

    private static class OnceFailingReducer
    extends RichReduceFunction<PrefixCount> {
        private static volatile boolean hasFailed = false;
        private final long numElements;
        private long failurePos;
        private long count;

        OnceFailingReducer(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) {
            long failurePosMin = (long)(0.4 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            long failurePosMax = (long)(0.7 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = new Random().nextLong() % (failurePosMax - failurePosMin) + failurePosMin;
            this.count = 0L;
        }

        public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
            ++this.count;
            if (!hasFailed && this.count >= this.failurePos) {
                hasFailed = true;
                throw new Exception("Test Failure");
            }
            value1.count += value2.count;
            return value1;
        }
    }

    private static class StatefulCounterFunction
    extends RichMapFunction<PrefixCount, PrefixCount>
    implements Checkpointed<Long> {
        static final long[] counts = new long[6];
        private long count = 0L;

        private StatefulCounterFunction() {
        }

        public PrefixCount map(PrefixCount value) throws Exception {
            ++this.count;
            return value;
        }

        public void close() {
            StatefulCounterFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.count;
        }

        public void restoreState(Long state) {
            this.count = state;
        }
    }

    private static class StringGeneratingSourceFunction
    extends RichSourceFunction<String>
    implements Checkpointed<Long>,
    ParallelSourceFunction<String> {
        private final long numElements;
        private Random rnd;
        private StringBuilder stringBuilder;
        private long index;
        private int step;
        private volatile boolean isRunning;
        static final long[] counts = new long[6];

        public void close() {
            StringGeneratingSourceFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.index;
        }

        StringGeneratingSourceFunction(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) {
            this.rnd = new Random();
            this.stringBuilder = new StringBuilder();
            this.step = this.getRuntimeContext().getNumberOfParallelSubtasks();
            if (this.index == 0L) {
                this.index = this.getRuntimeContext().getIndexOfThisSubtask();
            }
            this.isRunning = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object lockingObject = ctx.getCheckpointLock();
            while (this.isRunning && this.index < this.numElements) {
                char first = (char)(this.index % 40L + 40L);
                this.stringBuilder.setLength(0);
                this.stringBuilder.append(first);
                String result = StringGeneratingSourceFunction.randomString(this.stringBuilder, this.rnd);
                Object object = lockingObject;
                synchronized (object) {
                    this.index += (long)this.step;
                    ctx.collect((Object)result);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.index;
        }

        public void restoreState(Long state) {
            this.index = state;
        }

        private static String randomString(StringBuilder bld, Random rnd) {
            int len = rnd.nextInt(10) + 5;
            for (int i = 0; i < len; ++i) {
                char next = (char)(rnd.nextInt(20000) + 33);
                bld.append(next);
            }
            return bld.toString();
        }
    }
}

