/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.io.Serializable;
import java.time.Duration;
import java.util.Random;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.streaming.util.StateBackendUtils;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@Ignore
public class ManualWindowSpeedITCase
extends AbstractTestBaseJUnit4 {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackend() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String checkpoints = this.tempFolder.newFolder().toURI().toString();
        StateBackendUtils.configureHashMapStateBackend((StreamExecutionEnvironment)env);
        CheckpointStorageUtils.configureFileSystemCheckpointStorage((StreamExecutionEnvironment)env, (String)checkpoints);
        env.addSource((SourceFunction)new InfiniteTupleSource(1000)).assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).keyBy((KeySelector & Serializable)x -> (String)x.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L))).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)((String)value1.f0), (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackendWithLateness() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String checkpoints = this.tempFolder.newFolder().toURI().toString();
        StateBackendUtils.configureHashMapStateBackend((StreamExecutionEnvironment)env);
        CheckpointStorageUtils.configureFileSystemCheckpointStorage((StreamExecutionEnvironment)env, (String)checkpoints);
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).keyBy((KeySelector & Serializable)x -> (String)x.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L))).allowedLateness(Duration.ofSeconds(1L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)((String)value1.f0), (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackend() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StateBackendUtils.configureRocksDBStateBackend((StreamExecutionEnvironment)env);
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).keyBy((KeySelector & Serializable)x -> (String)x.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L))).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)((String)value1.f0), (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackendWithLateness() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StateBackendUtils.configureRocksDBStateBackend((StreamExecutionEnvironment)env);
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).keyBy((KeySelector & Serializable)x -> (String)x.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L))).allowedLateness(Duration.ofSeconds(1L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)((String)value1.f0), (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    private static class IngestionTimeWatermarkStrategy<T>
    implements WatermarkStrategy<T> {
        private IngestionTimeWatermarkStrategy() {
        }

        public static <T> IngestionTimeWatermarkStrategy<T> create() {
            return new IngestionTimeWatermarkStrategy<T>();
        }

        public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new AscendingTimestampsWatermarks();
        }

        public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (event, timestamp) -> System.currentTimeMillis();
        }
    }

    public static class InfiniteTupleSource
    implements ParallelSourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private int numKeys;
        private volatile boolean running = true;

        public InfiniteTupleSource(int numKeys) {
            this.numKeys = numKeys;
        }

        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> out) throws Exception {
            Random random = new Random(42L);
            while (this.running) {
                Tuple2 tuple = new Tuple2((Object)("Tuple " + random.nextInt(this.numKeys)), (Object)1);
                out.collect((Object)tuple);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

