/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class EventTimeWindowCheckpointingITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void startTestCluster() {
        Configuration config = new Configuration();
        config.setInteger("local.number-taskmanager", 2);
        config.setInteger("taskmanager.numberOfTaskSlots", 2);
        config.setInteger("taskmanager.memory.size", 48);
        config.setString("execution-retries.delay", "0 ms");
        cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING);
        cluster.start();
    }

    @AfterClass
    public static void stopTestCluster() {
        if (cluster != null) {
            cluster.stop();
        }
    }

    @Test
    public void testTumblingTimeWindow() {
        int NUM_ELEMENTS_PER_KEY = 3000;
        int WINDOW_SIZE = 100;
        int NUM_KEYS = 100;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    out.collect((Object)new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum)));
                }
            }).addSink((SinkFunction)new ValidatingSink(100, 30)).setParallelism(1);
            EventTimeWindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTumblingTimeWindowWithKVState() {
        int NUM_ELEMENTS_PER_KEY = 3000;
        int WINDOW_SIZE = 100;
        int NUM_KEYS = 100;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;
                private OperatorState<Integer> count;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                    this.count = this.getRuntimeContext().getKeyValueState("count", Integer.class, (Object)0);
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
                    if ((Integer)this.count.value() == 0) {
                        this.count.update((Object)((Long)tuple.getField(0)).intValue());
                    }
                    Assert.assertTrue((boolean)this.open);
                    this.count.update((Object)((Integer)this.count.value() + 1));
                    out.collect((Object)new Tuple4(tuple.getField(0), (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType((Integer)this.count.value())));
                }
            }).addSink((SinkFunction)new CountValidatingSink(100, 30)).setParallelism(1);
            EventTimeWindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlidingTimeWindow() {
        int NUM_ELEMENTS_PER_KEY = 3000;
        int WINDOW_SIZE = 1000;
        int WINDOW_SLIDE = 100;
        int NUM_KEYS = 100;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS), (AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    out.collect((Object)new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum)));
                }
            }).addSink((SinkFunction)new ValidatingSink(100, 30)).setParallelism(1);
            EventTimeWindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() {
        int NUM_ELEMENTS_PER_KEY = 3000;
        int WINDOW_SIZE = 100;
        int NUM_KEYS = 100;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((ReduceFunction)new RichReduceFunction<Tuple2<Long, IntType>>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    Assert.assertTrue((boolean)this.open);
                    return new Tuple2(a.f0, (Object)new IntType(((IntType)a.f1).value + ((IntType)b.f1).value));
                }
            }, (WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    out.collect((Object)new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum)));
                }
            }).addSink((SinkFunction)new ValidatingSink(100, 30)).setParallelism(1);
            EventTimeWindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() {
        int NUM_ELEMENTS_PER_KEY = 3000;
        int WINDOW_SIZE = 1000;
        int WINDOW_SLIDE = 100;
        int NUM_KEYS = 100;
        FailingSource.reset();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(100L);
            env.setNumberOfExecutionRetries(3);
            env.getConfig().disableSysoutLogging();
            env.addSource((SourceFunction)new FailingSource(100, 3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow((AbstractTime)Time.of((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS), (AbstractTime)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((ReduceFunction)new RichReduceFunction<Tuple2<Long, IntType>>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    Assert.assertTrue((boolean)this.open);
                    return new Tuple2(a.f0, (Object)new IntType(((IntType)a.f1).value + ((IntType)b.f1).value));
                }
            }, (WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    out.collect((Object)new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum)));
                }
            }).addSink((SinkFunction)new ValidatingSink(100, 30)).setParallelism(1);
            EventTimeWindowCheckpointingITCase.tryExecute(env, "Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception {
        try {
            env.execute(jobName);
        }
        catch (ProgramInvocationException | JobExecutionException root) {
            Throwable cause = root.getCause();
            int depth = 0;
            while (!(cause instanceof SuccessException)) {
                if (cause == null || depth++ == 20) {
                    root.printStackTrace();
                    Assert.fail((String)("Test failed: " + root.getMessage()));
                    continue;
                }
                cause = cause.getCause();
            }
        }
    }

    static final class SuccessException
    extends Exception {
        private static final long serialVersionUID = -9218191172606739598L;

        SuccessException() {
        }
    }

    public static class IntType {
        public int value;

        public IntType() {
        }

        public IntType(int value) {
            this.value = value;
        }
    }

    private static class CountValidatingSink
    extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
    implements Checkpointed<HashMap<Long, Integer>> {
        private final HashMap<Long, Integer> windowCounts = new HashMap();
        private final int numKeys;
        private final int numWindowsExpected;

        private CountValidatingSink(int numKeys, int numWindowsExpected) {
            this.numKeys = numKeys;
            this.numWindowsExpected = numWindowsExpected;
        }

        public void open(Configuration parameters) throws Exception {
            Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
        }

        public void close() throws Exception {
            boolean seenAll = true;
            if (this.windowCounts.size() == this.numKeys) {
                for (Integer windowCount : this.windowCounts.values()) {
                    if (windowCount >= this.numWindowsExpected) continue;
                    seenAll = false;
                    break;
                }
            }
            Assert.assertTrue((String)"The source must see all expected windows.", (boolean)seenAll);
        }

        public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
            Integer curr = this.windowCounts.get(value.f0);
            if (curr != null) {
                this.windowCounts.put((Long)value.f0, curr + 1);
            } else {
                this.windowCounts.put((Long)value.f0, 1);
            }
            Assert.assertEquals((String)("Window counts don't match for key " + value.f0 + "."), (long)(((Long)value.f0).intValue() + this.windowCounts.get(value.f0)), (long)((IntType)value.f3).value);
            boolean seenAll = true;
            if (this.windowCounts.size() == this.numKeys) {
                for (Integer windowCount : this.windowCounts.values()) {
                    if (windowCount < this.numWindowsExpected) {
                        seenAll = false;
                        break;
                    }
                    if (windowCount <= this.numWindowsExpected) continue;
                    Assert.fail((String)("Window count to high: " + windowCount));
                }
                if (seenAll) {
                    throw new SuccessException();
                }
            }
        }

        public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.windowCounts;
        }

        public void restoreState(HashMap<Long, Integer> state) {
            this.windowCounts.putAll(state);
        }
    }

    private static class ValidatingSink
    extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
    implements Checkpointed<HashMap<Long, Integer>> {
        private final HashMap<Long, Integer> windowCounts = new HashMap();
        private final int numKeys;
        private final int numWindowsExpected;

        private ValidatingSink(int numKeys, int numWindowsExpected) {
            this.numKeys = numKeys;
            this.numWindowsExpected = numWindowsExpected;
        }

        public void open(Configuration parameters) throws Exception {
            Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
        }

        public void close() throws Exception {
            boolean seenAll = true;
            if (this.windowCounts.size() == this.numKeys) {
                for (Integer windowCount : this.windowCounts.values()) {
                    if (windowCount >= this.numWindowsExpected) continue;
                    seenAll = false;
                    break;
                }
            }
            Assert.assertTrue((String)"The source must see all expected windows.", (boolean)seenAll);
        }

        public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
            int expectedSum = 0;
            for (long i = ((Long)value.f1).longValue(); i < (Long)value.f2; ++i) {
                if (i <= 0L) continue;
                expectedSum = (int)((long)expectedSum + i);
            }
            Assert.assertEquals((String)("Window start: " + value.f1 + " end: " + value.f2), (long)expectedSum, (long)((IntType)value.f3).value);
            Integer curr = this.windowCounts.get(value.f0);
            if (curr != null) {
                this.windowCounts.put((Long)value.f0, curr + 1);
            } else {
                this.windowCounts.put((Long)value.f0, 1);
            }
            boolean seenAll = true;
            if (this.windowCounts.size() == this.numKeys) {
                for (Integer windowCount : this.windowCounts.values()) {
                    if (windowCount < this.numWindowsExpected) {
                        seenAll = false;
                        break;
                    }
                    if (windowCount <= this.numWindowsExpected) continue;
                    Assert.fail((String)("Window count to high: " + windowCount));
                }
                if (seenAll) {
                    throw new SuccessException();
                }
            }
        }

        public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.windowCounts;
        }

        public void restoreState(HashMap<Long, Integer> state) {
            this.windowCounts.putAll(state);
        }
    }

    private static class FailingSource
    extends RichEventTimeSourceFunction<Tuple2<Long, IntType>>
    implements Checkpointed<Integer>,
    CheckpointNotifier {
        private static volatile boolean failedBefore = false;
        private final int numKeys;
        private final int numElementsToEmit;
        private final int failureAfterNumElements;
        private volatile int numElementsEmitted;
        private volatile int numSuccessfulCheckpoints;
        private volatile boolean running = true;

        private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
            this.numKeys = numKeys;
            this.numElementsToEmit = numElementsToEmitPerKey;
            this.failureAfterNumElements = failureAfterNumElements;
        }

        public void open(Configuration parameters) {
            Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
            while (this.running) {
                if (!failedBefore) {
                    Thread.sleep(1L);
                    if (this.numSuccessfulCheckpoints >= 2 && this.numElementsEmitted >= this.failureAfterNumElements) {
                        failedBefore = true;
                        throw new Exception("Artificial Failure");
                    }
                }
                if (this.numElementsEmitted < this.numElementsToEmit && (failedBefore || this.numElementsEmitted <= this.failureAfterNumElements)) {
                    Object object = ctx.getCheckpointLock();
                    synchronized (object) {
                        int next;
                        ++this.numElementsEmitted;
                        for (long i = 0L; i < (long)this.numKeys; ++i) {
                            ctx.collectWithTimestamp((Object)new Tuple2((Object)i, (Object)new IntType(next)), (long)next);
                        }
                        ctx.emitWatermark(new Watermark((long)next));
                        continue;
                    }
                }
                if (this.numElementsEmitted > this.numElementsToEmit * 5) {
                    System.err.println("Succ Checkpoints: " + this.numSuccessfulCheckpoints + " numElemEmitted: " + this.numElementsEmitted + "num elements to emit: " + this.numElementsToEmit);
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            ++this.numSuccessfulCheckpoints;
        }

        public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
            return this.numElementsEmitted;
        }

        public void restoreState(Integer state) {
            this.numElementsEmitted = state;
        }

        public static void reset() {
            failedBefore = false;
        }
    }
}

