package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.class */
public class EventTimeWindowCheckpointingITCase extends TestLogger {
    private static final int MAX_MEM_STATE_SIZE = 20971520;
    private static final int PARALLELISM = 4;
    private static final int NUM_OF_TASK_MANAGERS = 2;
    private TestingServer zkServer;
    public MiniClusterWithClientResource miniClusterResource;

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();

    @Rule
    public TestName name = new TestName();
    private AbstractStateBackend stateBackend;
    public StateBackendEnum stateBackendEnum;
    private final int buffersPerChannel;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase$8, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$8.class */
    public static /* synthetic */ class AnonymousClass8 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum = new int[StateBackendEnum.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.MEM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.FILE.ordinal()] = EventTimeWindowCheckpointingITCase.NUM_OF_TASK_MANAGERS;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.ROCKSDB_FULL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.ROCKSDB_INCREMENTAL.ordinal()] = EventTimeWindowCheckpointingITCase.PARALLELISM;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[StateBackendEnum.ROCKSDB_INCREMENTAL_ZK.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$CountingSinkValidatorUpdateFun.class */
    public static class CountingSinkValidatorUpdateFun implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
        CountingSinkValidatorUpdateFun() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: updateCount, reason: avoid collision after fix types in other method */
        public void updateCount2(Tuple4<Long, Long, Long, IntType> tuple4, Map<Long, Integer> map) {
            map.merge(tuple4.f0, 1, (num, num2) -> {
                return Integer.valueOf(num.intValue() + num2.intValue());
            });
            Assert.assertEquals("Window counts don't match for key " + tuple4.f0 + ".", ((Long) tuple4.f0).intValue() + ((Integer) map.get(tuple4.f0)).intValue(), ((IntType) tuple4.f3).value);
        }

        @Override // org.apache.flink.test.checkpointing.utils.ValidatingSink.CountUpdater
        public /* bridge */ /* synthetic */ void updateCount(Tuple4<Long, Long, Long, IntType> tuple4, Map map) {
            updateCount2(tuple4, (Map<Long, Integer>) map);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$KeyedEventTimeGenerator.class */
    public static class KeyedEventTimeGenerator implements FailingSource.EventEmittingGenerator {
        private final int keyUniverseSize;
        private final int watermarkTrailing;

        public KeyedEventTimeGenerator(int i, int i2) {
            this.keyUniverseSize = i;
            this.watermarkTrailing = (EventTimeWindowCheckpointingITCase.PARALLELISM * i2) / 3;
        }

        @Override // org.apache.flink.test.checkpointing.utils.FailingSource.EventEmittingGenerator
        public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> sourceContext, int i) {
            IntType intType = new IntType(i);
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= this.keyUniverseSize) {
                    sourceContext.emitWatermark(new Watermark(i - this.watermarkTrailing));
                    return;
                } else {
                    sourceContext.collectWithTimestamp(new Tuple2(Long.valueOf(j2), intType), i);
                    j = j2 + 1;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$SinkValidatorCheckFun.class */
    public static class SinkValidatorCheckFun implements ValidatingSink.ResultChecker {
        private final int numKeys;
        private final int numWindowsExpected;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SinkValidatorCheckFun(int i, int i2, int i3) {
            this.numKeys = i;
            this.numWindowsExpected = i2 / i3;
        }

        @Override // org.apache.flink.test.checkpointing.utils.ValidatingSink.ResultChecker
        public boolean checkResult(Map<Long, Integer> map) {
            if (map.size() != this.numKeys) {
                return false;
            }
            Iterator<Integer> it = map.values().iterator();
            while (it.hasNext()) {
                if (it.next().intValue() < this.numWindowsExpected) {
                    return false;
                }
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$SinkValidatorUpdateFun.class */
    static class SinkValidatorUpdateFun implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
        private final int elementsPerKey;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SinkValidatorUpdateFun(int i) {
            this.elementsPerKey = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: updateCount, reason: avoid collision after fix types in other method */
        public void updateCount2(Tuple4<Long, Long, Long, IntType> tuple4, Map<Long, Integer> map) {
            int i = 0;
            long min = Math.min(((Long) tuple4.f2).longValue(), this.elementsPerKey);
            long longValue = ((Long) tuple4.f1).longValue();
            while (true) {
                long j = longValue;
                if (j >= min) {
                    Assert.assertEquals("Window start: " + tuple4.f1 + " end: " + tuple4.f2, i, ((IntType) tuple4.f3).value);
                    map.merge(tuple4.f0, 1, (num, num2) -> {
                        return Integer.valueOf(num.intValue() + num2.intValue());
                    });
                    return;
                } else {
                    if (j > 0) {
                        i = (int) (i + j);
                    }
                    longValue = j + 1;
                }
            }
        }

        @Override // org.apache.flink.test.checkpointing.utils.ValidatingSink.CountUpdater
        public /* bridge */ /* synthetic */ void updateCount(Tuple4<Long, Long, Long, IntType> tuple4, Map map) {
            updateCount2(tuple4, (Map<Long, Integer>) map);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase$StateBackendEnum.class */
    public enum StateBackendEnum {
        MEM,
        FILE,
        ROCKSDB_FULL,
        ROCKSDB_INCREMENTAL,
        ROCKSDB_INCREMENTAL_ZK
    }

    @Parameterized.Parameters(name = "statebackend type ={0}, buffersPerChannel = {1}")
    public static Collection<Object[]> parameter() {
        return (Collection) Arrays.stream(StateBackendEnum.values()).map(stateBackendEnum -> {
            return new Object[]{new Object[]{stateBackendEnum, 0}, new Object[]{stateBackendEnum, Integer.valueOf(NUM_OF_TASK_MANAGERS)}};
        }).flatMap((v0) -> {
            return Arrays.stream(v0);
        }).collect(Collectors.toList());
    }

    public EventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum, int i) {
        this.stateBackendEnum = stateBackendEnum;
        this.buffersPerChannel = i;
    }

    protected StateBackendEnum getStateBackend() {
        return this.stateBackendEnum;
    }

    protected final MiniClusterWithClientResource getMiniClusterResource() {
        return new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfigurationSafe()).setNumberTaskManagers(NUM_OF_TASK_MANAGERS).setNumberSlotsPerTaskManager(NUM_OF_TASK_MANAGERS).build());
    }

    private Configuration getConfigurationSafe() {
        try {
            return getConfiguration();
        } catch (Exception e) {
            throw new AssertionError("Could not initialize test.", e);
        }
    }

    private Configuration getConfiguration() throws Exception {
        System.out.println("Starting " + getClass().getCanonicalName() + "#" + this.name.getMethodName() + ".");
        StateBackendEnum stateBackend = getStateBackend();
        if (StateBackendEnum.ROCKSDB_INCREMENTAL_ZK.equals(stateBackend)) {
            this.zkServer = new TestingServer();
            this.zkServer.start();
        }
        Configuration createClusterConfig = createClusterConfig();
        createClusterConfig.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, this.buffersPerChannel);
        switch (AnonymousClass8.$SwitchMap$org$apache$flink$test$checkpointing$EventTimeWindowCheckpointingITCase$StateBackendEnum[stateBackend.ordinal()]) {
            case 1:
                this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
                break;
            case NUM_OF_TASK_MANAGERS /* 2 */:
                this.stateBackend = new FsStateBackend(Path.fromLocalFile(tempFolder.newFolder().getAbsoluteFile()));
                break;
            case 3:
                setupRocksDB(createClusterConfig, -1, false);
                break;
            case PARALLELISM /* 4 */:
                createClusterConfig.set(RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
                setupRocksDB(createClusterConfig, 16, true);
                break;
            case 5:
                setupRocksDB(createClusterConfig, 16, true);
                break;
            default:
                throw new IllegalStateException("No backend selected.");
        }
        FsStateChangelogStorageFactory.configure(createClusterConfig, tempFolder.newFolder());
        return createClusterConfig;
    }

    private void setupRocksDB(Configuration configuration, int i, boolean z) throws IOException {
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(128L));
        String absolutePath = tempFolder.newFolder().getAbsolutePath();
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new FsStateBackend(Path.fromLocalFile(tempFolder.newFolder().getAbsoluteFile()).toUri(), i), z);
        rocksDBStateBackend.setDbStoragePath(absolutePath);
        this.stateBackend = rocksDBStateBackend;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Configuration createClusterConfig() throws IOException {
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        File newFolder = temporaryFolder.newFolder();
        Configuration configuration = new Configuration();
        configuration.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
        if (this.zkServer != null) {
            configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
            configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zkServer.getConnectString());
            configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, newFolder.toURI().toString());
        }
        return configuration;
    }

    @Before
    public void setupTestCluster() throws Exception {
        this.miniClusterResource = getMiniClusterResource();
        this.miniClusterResource.before();
    }

    @After
    public void stopTestCluster() throws IOException {
        if (this.miniClusterResource != null) {
            this.miniClusterResource.after();
            this.miniClusterResource = null;
        }
        if (this.zkServer != null) {
            this.zkServer.stop();
            this.zkServer = null;
        }
        System.out.println("Finished " + getClass().getCanonicalName() + "#" + this.name.getMethodName() + ".");
    }

    @Test
    public void testTumblingTimeWindow() {
        int numElementsPerKey = numElementsPerKey();
        int windowSize = windowSize();
        int numKeys = numKeys();
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.getConfig().setUseSnapshotCompression(true);
            executionEnvironment.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.1
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    int i = 0;
                    long j = -1;
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        i += ((IntType) tuple2.f1).value;
                        j = ((Long) tuple2.f0).longValue();
                    }
                    collector.collect(new Tuple4(Long.valueOf(j), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(i)));
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
        doTestTumblingTimeWindowWithKVState(PARALLELISM);
    }

    @Test
    public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
        doTestTumblingTimeWindowWithKVState(32768);
    }

    public void doTestTumblingTimeWindowWithKVState(int i) {
        int numElementsPerKey = numElementsPerKey();
        int windowSize = windowSize();
        int numKeys = numKeys();
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.setMaxParallelism(i);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.getConfig().setUseSnapshotCompression(true);
            executionEnvironment.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.2
                private boolean open = false;
                private ValueState<Integer> count;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                    this.count = getRuntimeContext().getState(new ValueStateDescriptor("count", Integer.class, 0));
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) throws Exception {
                    if (((Integer) this.count.value()).intValue() == 0) {
                        this.count.update(Integer.valueOf(((Long) tuple.getField(0)).intValue()));
                    }
                    Assert.assertTrue(this.open);
                    this.count.update(Integer.valueOf(((Integer) this.count.value()).intValue() + 1));
                    collector.collect(new Tuple4(tuple.getField(0), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(((Integer) this.count.value()).intValue())));
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new CountingSinkValidatorUpdateFun(), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingTimeWindow() {
        int numElementsPerKey = numElementsPerKey();
        int windowSize = windowSize();
        int windowSlide = windowSlide();
        int numKeys = numKeys();
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setMaxParallelism(8);
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.getConfig().setUseSnapshotCompression(true);
            executionEnvironment.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(windowSlide))).apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.3
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    int i = 0;
                    long j = -1;
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        i += ((IntType) tuple2.f1).value;
                        j = ((Long) tuple2.f0).longValue();
                    }
                    collector.collect(new Tuple4(Long.valueOf(j), Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), new IntType(i)));
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() {
        int numElementsPerKey = numElementsPerKey();
        int windowSize = windowSize();
        int numKeys = numKeys();
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.getConfig().setUseSnapshotCompression(true);
            executionEnvironment.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.4
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(((IntType) tuple2.f1).value + ((IntType) tuple22.f1).value));
                }
            }, new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.5
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        collector.collect(new Tuple4(tuple2.f0, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), tuple2.f1));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() {
        int numElementsPerKey = numElementsPerKey();
        int windowSize = windowSize();
        int windowSlide = windowSlide();
        int numKeys = numKeys();
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(PARALLELISM);
            executionEnvironment.enableCheckpointing(100L);
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            executionEnvironment.setStateBackend(this.stateBackend);
            executionEnvironment.getConfig().setUseSnapshotCompression(true);
            executionEnvironment.addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(windowSlide))).reduce(new ReduceFunction<Tuple2<Long, IntType>>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.6
                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> tuple2, Tuple2<Long, IntType> tuple22) {
                    return new Tuple2<>(tuple2.f0, new IntType(((IntType) tuple2.f1).value + ((IntType) tuple22.f1).value));
                }
            }, new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { // from class: org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.7
                private boolean open = false;

                public void open(Configuration configuration) {
                    Assert.assertEquals(4L, getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Long, IntType>> iterable, Collector<Tuple4<Long, Long, Long, IntType>> collector) {
                    Assert.assertTrue(this.open);
                    for (Tuple2<Long, IntType> tuple2 : iterable) {
                        collector.collect(new Tuple4(tuple2.f0, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), tuple2.f1));
                    }
                }

                public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
                    apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Long, IntType>>) iterable, (Collector<Tuple4<Long, Long, Long, IntType>>) collector);
                }
            }).addSink(new ValidatingSink(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
            executionEnvironment.execute("Tumbling Window Test");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private int numElementsPerKey() {
        return 3000;
    }

    private int windowSize() {
        return 1000;
    }

    private int windowSlide() {
        return 100;
    }

    private int numKeys() {
        return 100;
    }
}
