package org.apache.flink.test.checkpointing;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase.class */
public class WindowCheckpointingITCase extends TestLogger {
    private TimeCharacteristic timeCharacteristic;
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    /* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase$Generator.class */
    static class Generator implements FailingSource.EventEmittingGenerator {
        Generator() {
        }

        @Override // org.apache.flink.test.checkpointing.utils.FailingSource.EventEmittingGenerator
        public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> sourceContext, int i) {
            sourceContext.collect(new Tuple2(Long.valueOf(i), new IntType(i)));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/WindowCheckpointingITCase$SinkValidatorUpdaterAndChecker.class */
    static class SinkValidatorUpdaterAndChecker implements ValidatingSink.CountUpdater<Tuple2<Long, IntType>>, ValidatingSink.ResultChecker {
        private final int elementCountExpected;
        private final int countPerElementExpected;

        SinkValidatorUpdaterAndChecker(int i, int i2) {
            this.elementCountExpected = i;
            this.countPerElementExpected = i2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: updateCount, reason: avoid collision after fix types in other method */
        public void updateCount2(Tuple2<Long, IntType> tuple2, Map<Long, Integer> map) {
            map.merge(tuple2.f0, Integer.valueOf(((IntType) tuple2.f1).value), (num, num2) -> {
                return Integer.valueOf(num.intValue() + num2.intValue());
            });
        }

        @Override // org.apache.flink.test.checkpointing.utils.ValidatingSink.ResultChecker
        public boolean checkResult(Map<Long, Integer> map) {
            int i = 0;
            Iterator<Integer> it = map.values().iterator();
            while (it.hasNext()) {
                i += it.next().intValue();
            }
            if (i < this.elementCountExpected * this.countPerElementExpected || this.elementCountExpected != map.size()) {
                return false;
            }
            Iterator<Integer> it2 = map.values().iterator();
            while (it2.hasNext()) {
                if (this.countPerElementExpected != it2.next().intValue()) {
                    return false;
                }
            }
            return true;
        }

        @Override // org.apache.flink.test.checkpointing.utils.ValidatingSink.CountUpdater
        public /* bridge */ /* synthetic */ void updateCount(Tuple2<Long, IntType> tuple2, Map map) {
            updateCount2(tuple2, (Map<Long, Integer>) map);
        }
    }

    public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
        return configuration;
    }

    @Test
    public void testTumblingProcessingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            executionEnvironment.getConfig().setAutoWatermarkInterval(10L);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.getConfig().disableSysoutLogging();
            SinkValidatorUpdaterAndChecker sinkValidatorUpdaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 1);
            executionEnvironment.addSource(new FailingSource(new Generator(), 3000, this.timeCharacteristic)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.1
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple2<Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        Assert.assertEquals(((Long) tuple2.f0).intValue(), ((IntType) tuple2.f1).value);
                        collector.collect(new Tuple2(tuple2.f0, new IntType(1)));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple2<Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(sinkValidatorUpdaterAndChecker, sinkValidatorUpdaterAndChecker, this.timeCharacteristic)).setParallelism(1);
            TestUtils.tryExecute(executionEnvironment, "Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingProcessingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            executionEnvironment.getConfig().setAutoWatermarkInterval(10L);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.getConfig().disableSysoutLogging();
            SinkValidatorUpdaterAndChecker sinkValidatorUpdaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 3);
            executionEnvironment.addSource(new FailingSource(new Generator(), 3000, this.timeCharacteristic)).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(150L, TimeUnit.MILLISECONDS), Time.of(50L, TimeUnit.MILLISECONDS)).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.2
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple2<Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        Assert.assertEquals(((Long) tuple2.f0).intValue(), ((IntType) tuple2.f1).value);
                        collector.collect(new Tuple2(tuple2.f0, new IntType(1)));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple2<Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(sinkValidatorUpdaterAndChecker, sinkValidatorUpdaterAndChecker, this.timeCharacteristic)).setParallelism(1);
            TestUtils.tryExecute(executionEnvironment, "Sliding Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testAggregatingTumblingProcessingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            executionEnvironment.getConfig().setAutoWatermarkInterval(10L);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.getConfig().disableSysoutLogging();
            SinkValidatorUpdaterAndChecker sinkValidatorUpdaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 1);
            executionEnvironment.addSource(new FailingSource(new Generator(), 3000, this.timeCharacteristic)).map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.4
                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> tuple2) {
                    ((IntType) tuple2.f1).value = 1;
                    return tuple2;
                }
            }).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(100L, TimeUnit.MILLISECONDS)).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.3
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(1));
                }
            }).addSink(new ValidatingSink(sinkValidatorUpdaterAndChecker, sinkValidatorUpdaterAndChecker, this.timeCharacteristic)).setParallelism(1);
            TestUtils.tryExecute(executionEnvironment, "Aggregating Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testAggregatingSlidingProcessingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.setStreamTimeCharacteristic(this.timeCharacteristic);
            executionEnvironment.getConfig().setAutoWatermarkInterval(10L);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.getConfig().disableSysoutLogging();
            SinkValidatorUpdaterAndChecker sinkValidatorUpdaterAndChecker = new SinkValidatorUpdaterAndChecker(3000, 3);
            executionEnvironment.addSource(new FailingSource(new Generator(), 3000, this.timeCharacteristic)).map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.6
                public Tuple2<Long, IntType> map(Tuple2<Long, IntType> tuple2) {
                    ((IntType) tuple2.f1).value = 1;
                    return tuple2;
                }
            }).rebalance().keyBy(new int[]{0}).timeWindow(Time.of(150L, TimeUnit.MILLISECONDS), Time.of(50L, TimeUnit.MILLISECONDS)).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.WindowCheckpointingITCase.5
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(1));
                }
            }).addSink(new ValidatingSink(sinkValidatorUpdaterAndChecker, sinkValidatorUpdaterAndChecker, this.timeCharacteristic)).setParallelism(1);
            TestUtils.tryExecute(executionEnvironment, "Aggregating Sliding Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "TimeCharacteristic = {0}")
    public static Collection<TimeCharacteristic[]> timeCharacteristic() {
        return Arrays.asList(new TimeCharacteristic[]{TimeCharacteristic.ProcessingTime}, new TimeCharacteristic[]{TimeCharacteristic.IngestionTime});
    }
}
