package org.apache.flink.test.checkpointing;

import java.time.Duration;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.class */
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m"));
        configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1L));
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1L));
        return configuration;
    }

    @Test
    public void testTumblingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100L))).apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase.1
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    int i = 0;
                    long j = -1;
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        i += ((IntType) tuple2.f1).value;
                        j = ((Long) tuple2.f0).longValue();
                    }
                    collector.collect(new Tuple4(Long.valueOf(j), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(i)));
                }

                public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().windowAll(SlidingEventTimeWindows.of(Time.milliseconds(1000L), Time.milliseconds(100L))).apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase.2
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    int i = 0;
                    long j = -1;
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        i += ((IntType) tuple2.f1).value;
                        j = ((Long) tuple2.f0).longValue();
                    }
                    collector.collect(new Tuple4(Long.valueOf(j), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(i)));
                }

                public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            executionEnvironment.execute("Sliding Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100L))).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase.3
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(((IntType) tuple2.f1).value + ((IntType) tuple22.f1).value));
                }
            }, new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase.4
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        collector.collect(new Tuple4(tuple2.f0, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), tuple2.f1));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() {
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(1, 100), 3000)).rebalance().windowAll(SlidingEventTimeWindows.of(Time.milliseconds(1000L), Time.milliseconds(100L))).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase.5
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(((IntType) tuple2.f1).value + ((IntType) tuple22.f1).value));
                }
            }, new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase.6
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        collector.collect(new Tuple4(tuple2.f0, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), tuple2.f1));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(3000), new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(1, 3000, 100))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
