package org.apache.flink.test.checkpointing;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase.class */
public class WindowCheckpointingITCase extends TestLogger {
    private TimeCharacteristic timeCharacteristic;
    private static final int PARALLELISM = 4;
    private static ForkableFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase$FailingSource.class */
    private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> implements Checkpointed<Integer>, CheckpointListener {
        private static volatile boolean failedBefore = false;
        private final int numElementsToEmit;
        private final int failureAfterNumElements;
        private volatile int numElementsEmitted;
        private volatile int numSuccessfulCheckpoints;
        private volatile boolean running;

        private FailingSource(int i, int i2) {
            this.running = true;
            this.numElementsToEmit = i;
            this.failureAfterNumElements = i2;
        }

        public void open(Configuration configuration) {
            Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, IntType>> sourceContext) throws Exception {
            while (this.running) {
                if (!failedBefore) {
                    Thread.sleep(1L);
                    if (this.numSuccessfulCheckpoints >= 2 && this.numElementsEmitted >= this.failureAfterNumElements) {
                        failedBefore = true;
                        throw new Exception("Artificial Failure");
                    }
                }
                if (this.numElementsEmitted >= this.numElementsToEmit || (!failedBefore && this.numElementsEmitted > this.failureAfterNumElements)) {
                    Thread.sleep(1L);
                } else {
                    synchronized (sourceContext.getCheckpointLock()) {
                        int i = this.numElementsEmitted;
                        this.numElementsEmitted = i + 1;
                        sourceContext.collect(new Tuple2(Long.valueOf(i), new IntType(i)));
                    }
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) {
            this.numSuccessfulCheckpoints++;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Integer m598snapshotState(long j, long j2) {
            return Integer.valueOf(this.numElementsEmitted);
        }

        public void restoreState(Integer num) {
            this.numElementsEmitted = num.intValue();
        }

        public static void reset() {
            failedBefore = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase$IntType.class */
    public static class IntType {
        public int value;

        public IntType() {
        }

        public IntType(int i) {
            this.value = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase$ValidatingSink.class */
    private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>> implements Checkpointed<HashMap<Long, Integer>> {
        private final HashMap<Long, Integer> counts;
        private final int elementCountExpected;
        private final int countPerElementExpected;
        private int aggCount;

        private ValidatingSink(int i, int i2) {
            this.counts = new HashMap<>();
            this.elementCountExpected = i;
            this.countPerElementExpected = i2;
        }

        public void open(Configuration configuration) throws Exception {
            Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void invoke(Tuple2<Long, IntType> tuple2) throws Exception {
            Integer num = this.counts.get(tuple2.f0);
            if (num != null) {
                this.counts.put(tuple2.f0, Integer.valueOf(num.intValue() + ((IntType) tuple2.f1).value));
            } else {
                this.counts.put(tuple2.f0, Integer.valueOf(((IntType) tuple2.f1).value));
            }
            this.aggCount += ((IntType) tuple2.f1).value;
            if (this.aggCount >= this.elementCountExpected * this.countPerElementExpected) {
                Assert.assertEquals(this.elementCountExpected, this.counts.size());
                Iterator<Integer> it = this.counts.values().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(this.countPerElementExpected, it.next().intValue());
                }
                throw new SuccessException();
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public HashMap<Long, Integer> m599snapshotState(long j, long j2) {
            return this.counts;
        }

        public void restoreState(HashMap<Long, Integer> hashMap) {
            this.counts.putAll(hashMap);
            Iterator<Integer> it = hashMap.values().iterator();
            while (it.hasNext()) {
                this.aggCount += it.next().intValue();
            }
        }
    }

    public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
    }

    @BeforeClass
    public static void startTestCluster() {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 2);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
        configuration.setInteger("taskmanager.memory.size", 48);
        cluster = new ForkableFlinkMiniCluster(configuration, false);
        cluster.start();
    }

    @AfterClass
    public static void stopTestCluster() {
        if (cluster != null) {
            cluster.stop();
        }
    }

    @Test
    public void testTumblingProcessingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            createRemoteEnvironment.getConfig().setAutoWatermarkInterval(10L);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.addSource(new FailingSource(3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.1
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple2<Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        Assert.assertEquals(((Long) tuple2.f0).intValue(), ((IntType) tuple2.f1).value);
                        collector.collect(new Tuple2(tuple2.f0, new IntType(1)));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple2<Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(3000, 1)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingProcessingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            createRemoteEnvironment.getConfig().setAutoWatermarkInterval(10L);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.addSource(new FailingSource(3000, 1000)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(150L, TimeUnit.MILLISECONDS), Time.of(50L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.2
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple2<Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        Assert.assertEquals(((Long) tuple2.f0).intValue(), ((IntType) tuple2.f1).value);
                        collector.collect(new Tuple2(tuple2.f0, new IntType(1)));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple2<Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(3000, 3)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testAggregatingTumblingProcessingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            createRemoteEnvironment.getConfig().setAutoWatermarkInterval(10L);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.addSource(new FailingSource(3000, 1000)).map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.4
                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> tuple2) {
                    ((IntType) tuple2.f1).value = 1;
                    return tuple2;
                }
            }).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.3
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(1));
                }
            }).addSink(new ValidatingSink(3000, 1)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testAggregatingSlidingProcessingTimeWindow() {
        FailingSource.reset();
        try {
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort(), new String[0]);
            createRemoteEnvironment.setParallelism(PARALLELISM);
            createRemoteEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            createRemoteEnvironment.getConfig().setAutoWatermarkInterval(10L);
            createRemoteEnvironment.enableCheckpointing(100L);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            createRemoteEnvironment.addSource(new FailingSource(3000, 1000)).map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.6
                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> tuple2) {
                    ((IntType) tuple2.f1).value = 1;
                    return tuple2;
                }
            }).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(150L, TimeUnit.MILLISECONDS), Time.of(50L, TimeUnit.MILLISECONDS)).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.5
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(1));
                }
            }).addSink(new ValidatingSink(3000, 3)).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "TimeCharacteristic = {0}")
    public static Collection<TimeCharacteristic[]> timeCharacteristic() {
        return Arrays.asList(new TimeCharacteristic[]{TimeCharacteristic.ProcessingTime}, new TimeCharacteristic[]{TimeCharacteristic.IngestionTime});
    }
}
