/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class WindowCheckpointingITCase
extends TestLogger {
    private TimeCharacteristic timeCharacteristic;
    private static final int PARALLELISM = 4;
    private static ForkableFlinkMiniCluster cluster;

    public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
    }

    @BeforeClass
    public static void startTestCluster() {
        Configuration config = new Configuration();
        config.setInteger("local.number-taskmanager", 2);
        config.setInteger("taskmanager.numberOfTaskSlots", 2);
        config.setInteger("taskmanager.memory.size", 48);
        config.setString("execution-retries.delay", "0 ms");
        cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING);
        cluster.start();
    }

    @AfterClass
    public static void stopTestCluster() {
        if (cluster != null) {
            cluster.stop();
        }
    }

    @Test
    public void testTumblingProcessingTimeWindow() {
        int NUM_ELEMENTS = 3000;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(this.timeCharacteristic);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple2<Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> value : values) {
                        Assert.assertEquals((long)((Long)value.f0).intValue(), (long)((IntType)value.f1).value);
                        out.collect((Object)new Tuple2(value.f0, (Object)new IntType(1)));
                    }
                }
            }).addSink((SinkFunction)new ValidatingSink(3000, 1)).setParallelism(1);
            WindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlidingProcessingTimeWindow() {
        int NUM_ELEMENTS = 3000;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(this.timeCharacteristic);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)150L, (TimeUnit)TimeUnit.MILLISECONDS), (AbstractTime)Time.of((long)50L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple2<Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> value : values) {
                        Assert.assertEquals((long)((Long)value.f0).intValue(), (long)((IntType)value.f1).value);
                        out.collect((Object)new Tuple2(value.f0, (Object)new IntType(1)));
                    }
                }
            }).addSink((SinkFunction)new ValidatingSink(3000, 3)).setParallelism(1);
            WindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAggregatingTumblingProcessingTimeWindow() {
        int NUM_ELEMENTS = 3000;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(this.timeCharacteristic);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(3000, 1000)).map((MapFunction)new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
                    ((IntType)value.f1).value = 1;
                    return value;
                }
            }).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).reduce((ReduceFunction)new RichReduceFunction<Tuple2<Long, IntType>>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    Assert.assertTrue((boolean)this.open);
                    return new Tuple2(a.f0, (Object)new IntType(1));
                }
            }).addSink((SinkFunction)new ValidatingSink(3000, 1)).setParallelism(1);
            WindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAggregatingSlidingProcessingTimeWindow() {
        int NUM_ELEMENTS = 3000;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(this.timeCharacteristic);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(3000, 1000)).map((MapFunction)new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
                    ((IntType)value.f1).value = 1;
                    return value;
                }
            }).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)150L, (TimeUnit)TimeUnit.MILLISECONDS), (AbstractTime)Time.of((long)50L, (TimeUnit)TimeUnit.MILLISECONDS)).reduce((ReduceFunction)new RichReduceFunction<Tuple2<Long, IntType>>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    Assert.assertTrue((boolean)this.open);
                    return new Tuple2(a.f0, (Object)new IntType(1));
                }
            }).addSink((SinkFunction)new ValidatingSink(3000, 3)).setParallelism(1);
            WindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Parameterized.Parameters(name="TimeCharacteristic = {0}")
    public static Collection<TimeCharacteristic[]> timeCharacteristic() {
        return Arrays.asList({TimeCharacteristic.ProcessingTime}, {TimeCharacteristic.IngestionTime});
    }

    public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception {
        try {
            env.execute(jobName);
        }
        catch (ProgramInvocationException | JobExecutionException root) {
            Throwable cause = root.getCause();
            int depth = 0;
            while (!(cause instanceof SuccessException)) {
                if (cause == null || depth++ == 20) {
                    root.printStackTrace();
                    Assert.fail((String)("Test failed: " + root.getMessage()));
                    continue;
                }
                cause = cause.getCause();
            }
        }
    }

    static final class SuccessException
    extends Exception {
        private static final long serialVersionUID = -9218191172606739598L;

        SuccessException() {
        }
    }

    public static class IntType {
        public int value;

        public IntType() {
        }

        public IntType(int value) {
            this.value = value;
        }
    }

    private static class ValidatingSink
    extends RichSinkFunction<Tuple2<Long, IntType>>
    implements Checkpointed<HashMap<Long, Integer>> {
        private final HashMap<Long, Integer> counts = new HashMap();
        private final int elementCountExpected;
        private final int countPerElementExpected;
        private int aggCount;

        private ValidatingSink(int elementCountExpected, int countPerElementExpected) {
            this.elementCountExpected = elementCountExpected;
            this.countPerElementExpected = countPerElementExpected;
        }

        public void open(Configuration parameters) throws Exception {
            Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
        }

        public void invoke(Tuple2<Long, IntType> value) throws Exception {
            Integer curr = this.counts.get(value.f0);
            if (curr != null) {
                this.counts.put((Long)value.f0, curr + ((IntType)value.f1).value);
            } else {
                this.counts.put((Long)value.f0, ((IntType)value.f1).value);
            }
            this.aggCount += ((IntType)value.f1).value;
            if (this.aggCount >= this.elementCountExpected * this.countPerElementExpected) {
                Assert.assertEquals((long)this.elementCountExpected, (long)this.counts.size());
                for (Integer i : this.counts.values()) {
                    Assert.assertEquals((long)this.countPerElementExpected, (long)i.intValue());
                }
                throw new SuccessException();
            }
        }

        public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.counts;
        }

        public void restoreState(HashMap<Long, Integer> state) {
            this.counts.putAll(state);
            for (Integer i : state.values()) {
                this.aggCount += i.intValue();
            }
        }
    }

    private static class FailingSource
    extends RichSourceFunction<Tuple2<Long, IntType>>
    implements Checkpointed<Integer>,
    CheckpointNotifier {
        private static volatile boolean failedBefore = false;
        private final int numElementsToEmit;
        private final int failureAfterNumElements;
        private volatile int numElementsEmitted;
        private volatile int numSuccessfulCheckpoints;
        private volatile boolean running = true;

        private FailingSource(int numElementsToEmit, int failureAfterNumElements) {
            this.numElementsToEmit = numElementsToEmit;
            this.failureAfterNumElements = failureAfterNumElements;
        }

        public void open(Configuration parameters) {
            Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
            while (this.running) {
                if (!failedBefore) {
                    Thread.sleep(1L);
                    if (this.numSuccessfulCheckpoints >= 2 && this.numElementsEmitted >= this.failureAfterNumElements) {
                        failedBefore = true;
                        throw new Exception("Artificial Failure");
                    }
                }
                if (this.numElementsEmitted < this.numElementsToEmit && (failedBefore || this.numElementsEmitted <= this.failureAfterNumElements)) {
                    Object object = ctx.getCheckpointLock();
                    synchronized (object) {
                        int next;
                        ++this.numElementsEmitted;
                        ctx.collect((Object)new Tuple2((Object)next, (Object)new IntType(next)));
                        continue;
                    }
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            ++this.numSuccessfulCheckpoints;
        }

        public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.numElementsEmitted;
        }

        public void restoreState(Integer state) {
            this.numElementsEmitted = state;
        }

        public static void reset() {
            failedBefore = false;
        }
    }
}

