/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ProcessingTimeWindowCheckpointingITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(ProcessingTimeWindowCheckpointingITCase.getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"48m"));
        return config;
    }

    @Test
    public void testTumblingProcessingTimeWindow() {
        int numElements = 3000;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            SinkValidatorUpdaterAndChecker updaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 1);
            env.addSource((SourceFunction)new FailingSource(new Generator(), 3000, true)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.milliseconds((long)100L))).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(OpenContext openContext) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple2<Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> value : values) {
                        Assert.assertEquals((long)((Long)value.f0).intValue(), (long)((IntType)value.f1).value);
                        out.collect((Object)new Tuple2(value.f0, (Object)new IntType(1)));
                    }
                }
            }).addSink(new ValidatingSink<Tuple2<Long, IntType>>(updaterAndChecker, updaterAndChecker, true)).setParallelism(1);
            TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlidingProcessingTimeWindow() {
        int numElements = 3000;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            SinkValidatorUpdaterAndChecker updaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 3);
            env.addSource((SourceFunction)new FailingSource(new Generator(), 3000, true)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)SlidingProcessingTimeWindows.of((Time)Time.milliseconds((long)150L), (Time)Time.milliseconds((long)50L))).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(OpenContext openContext) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple2<Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> value : values) {
                        Assert.assertEquals((long)((Long)value.f0).intValue(), (long)((IntType)value.f1).value);
                        out.collect((Object)new Tuple2(value.f0, (Object)new IntType(1)));
                    }
                }
            }).addSink(new ValidatingSink<Tuple2<Long, IntType>>(updaterAndChecker, updaterAndChecker, true)).setParallelism(1);
            TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Sliding Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAggregatingTumblingProcessingTimeWindow() {
        int numElements = 3000;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            SinkValidatorUpdaterAndChecker updaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 1);
            env.addSource((SourceFunction)new FailingSource(new Generator(), 3000, true)).map((MapFunction)new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
                    ((IntType)value.f1).value = 1;
                    return value;
                }
            }).rebalance().keyBy(new int[]{0}).window((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.milliseconds((long)100L))).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    return new Tuple2(a.f0, (Object)new IntType(1));
                }
            }).addSink(new ValidatingSink<Tuple2<Long, IntType>>(updaterAndChecker, updaterAndChecker, true)).setParallelism(1);
            TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Aggregating Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAggregatingSlidingProcessingTimeWindow() {
        int numElements = 3000;
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.getConfig().setAutoWatermarkInterval(10L);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            SinkValidatorUpdaterAndChecker updaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 3);
            env.addSource((SourceFunction)new FailingSource(new Generator(), 3000, true)).map((MapFunction)new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
                    ((IntType)value.f1).value = 1;
                    return value;
                }
            }).rebalance().keyBy(new int[]{0}).window((WindowAssigner)SlidingProcessingTimeWindows.of((Time)Time.milliseconds((long)150L), (Time)Time.milliseconds((long)50L))).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    return new Tuple2(a.f0, (Object)new IntType(1));
                }
            }).addSink(new ValidatingSink<Tuple2<Long, IntType>>(updaterAndChecker, updaterAndChecker, true)).setParallelism(1);
            TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Aggregating Sliding Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    static class SinkValidatorUpdaterAndChecker
    implements ValidatingSink.CountUpdater<Tuple2<Long, IntType>>,
    ValidatingSink.ResultChecker {
        private final int elementCountExpected;
        private final int countPerElementExpected;

        SinkValidatorUpdaterAndChecker(int elementCountExpected, int countPerElementExpected) {
            this.elementCountExpected = elementCountExpected;
            this.countPerElementExpected = countPerElementExpected;
        }

        @Override
        public void updateCount(Tuple2<Long, IntType> value, Map<Long, Integer> windowCounts) {
            windowCounts.merge((Long)value.f0, ((IntType)value.f1).value, (a, b) -> a + b);
        }

        @Override
        public boolean checkResult(Map<Long, Integer> windowCounts) {
            int aggCount = 0;
            for (Integer n : windowCounts.values()) {
                aggCount += n.intValue();
            }
            if (aggCount < this.elementCountExpected * this.countPerElementExpected || this.elementCountExpected != windowCounts.size()) {
                return false;
            }
            for (Map.Entry entry : windowCounts.entrySet()) {
                if ((Integer)entry.getValue() < this.countPerElementExpected) {
                    return false;
                }
                if ((Integer)entry.getValue() <= this.countPerElementExpected) continue;
                Assert.fail((String)String.format("counter too big for %d: %d (expected %d)", entry.getKey(), entry.getValue(), this.countPerElementExpected));
            }
            return true;
        }
    }

    static class Generator
    implements FailingSource.EventEmittingGenerator {
        Generator() {
        }

        @Override
        public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
            ctx.collect((Object)new Tuple2((Object)eventSequenceNo, (Object)new IntType(eventSequenceNo)));
        }
    }
}

