package org.apache.flink.test.state;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@Ignore
/* loaded from: input_file:org/apache/flink/test/state/ManualWindowSpeedITCase.class */
public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/state/ManualWindowSpeedITCase$InfiniteTupleSource.class */
    public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1;
        private int numKeys;
        private volatile boolean running = true;

        public InfiniteTupleSource(int i) {
            this.numKeys = i;
        }

        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            long j = 0;
            while (true) {
                long j2 = j;
                if (!this.running) {
                    return;
                }
                sourceContext.collect(new Tuple2("Tuple " + (j2 % this.numKeys), 1));
                j = j2 + serialVersionUID;
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackend() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStateBackend(new FsStateBackend(this.tempFolder.newFolder().toURI().toString()));
        executionEnvironment.addSource(new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.2
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }).filter(new FilterFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.1
            private static final long serialVersionUID = 1;

            public boolean filter(Tuple2<String, Integer> tuple2) throws Exception {
                return ((String) tuple2.f0).startsWith("Tuple 0");
            }
        }).print();
        executionEnvironment.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackendWithLateness() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStateBackend(new FsStateBackend(this.tempFolder.newFolder().toURI().toString()));
        executionEnvironment.addSource(new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).allowedLateness(Time.seconds(1L)).reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.4
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }).filter(new FilterFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.3
            private static final long serialVersionUID = 1;

            public boolean filter(Tuple2<String, Integer> tuple2) throws Exception {
                return ((String) tuple2.f0).startsWith("Tuple 0");
            }
        }).print();
        executionEnvironment.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackend() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        executionEnvironment.addSource(new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.6
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }).filter(new FilterFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.5
            private static final long serialVersionUID = 1;

            public boolean filter(Tuple2<String, Integer> tuple2) throws Exception {
                return ((String) tuple2.f0).startsWith("Tuple 0");
            }
        }).print();
        executionEnvironment.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackendWithLateness() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        executionEnvironment.addSource(new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).allowedLateness(Time.seconds(1L)).reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.8
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }).filter(new FilterFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.7
            private static final long serialVersionUID = 1;

            public boolean filter(Tuple2<String, Integer> tuple2) throws Exception {
                return ((String) tuple2.f0).startsWith("Tuple 0");
            }
        }).print();
        executionEnvironment.execute();
    }

    @Test
    public void testAlignedProcessingTimeWindows() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
        executionEnvironment.addSource(new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.10
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }).filter(new FilterFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.state.ManualWindowSpeedITCase.9
            private static final long serialVersionUID = 1;

            public boolean filter(Tuple2<String, Integer> tuple2) throws Exception {
                return ((String) tuple2.f0).startsWith("Tuple 0");
            }
        }).print();
        executionEnvironment.execute();
    }
}
