/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class WindowTranslationTest {
    @Test(expected=UnsupportedOperationException.class)
    public void testReduceFailWithRichReducer() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"hello", (Object)1), Tuple2.of((Object)"hello", (Object)2)});
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        SingleOutputStreamOperator window1 = source.keyBy(new int[]{0}).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS))).reduce((ReduceFunction)new RichReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return null;
            }
        });
    }

    @Test
    public void testEventTime() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"hello", (Object)1), Tuple2.of((Object)"hello", (Object)2)});
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DummyReducer reducer = new DummyReducer();
        SingleOutputStreamOperator window1 = source.keyBy(new int[]{0}).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS))).reduce((ReduceFunction)reducer);
        OneInputTransformation transform1 = (OneInputTransformation)window1.getTransformation();
        OneInputStreamOperator operator1 = transform1.getOperator();
        Assert.assertTrue((boolean)(operator1 instanceof WindowOperator));
        WindowOperator winOperator1 = (WindowOperator)operator1;
        Assert.assertTrue((boolean)(winOperator1.getTrigger() instanceof EventTimeTrigger));
        Assert.assertTrue((boolean)(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows));
        Assert.assertTrue((boolean)(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor));
        SingleOutputStreamOperator window2 = source.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS))).apply((WindowFunction)new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>(){
            private static final long serialVersionUID = 1L;

            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            }
        });
        OneInputTransformation transform2 = (OneInputTransformation)window2.getTransformation();
        OneInputStreamOperator operator2 = transform2.getOperator();
        Assert.assertTrue((boolean)(operator2 instanceof WindowOperator));
        WindowOperator winOperator2 = (WindowOperator)operator2;
        Assert.assertTrue((boolean)(winOperator2.getTrigger() instanceof EventTimeTrigger));
        Assert.assertTrue((boolean)(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows));
        Assert.assertTrue((boolean)(winOperator2.getStateDescriptor() instanceof ListStateDescriptor));
    }

    @Test
    public void testNonEvicting() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStreamSource source = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"hello", (Object)1), Tuple2.of((Object)"hello", (Object)2)});
        DummyReducer reducer = new DummyReducer();
        SingleOutputStreamOperator window1 = source.keyBy(new int[]{0}).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS))).trigger((Trigger)CountTrigger.of((long)100L)).reduce((ReduceFunction)reducer);
        OneInputTransformation transform1 = (OneInputTransformation)window1.getTransformation();
        OneInputStreamOperator operator1 = transform1.getOperator();
        Assert.assertTrue((boolean)(operator1 instanceof WindowOperator));
        WindowOperator winOperator1 = (WindowOperator)operator1;
        Assert.assertTrue((boolean)(winOperator1.getTrigger() instanceof CountTrigger));
        Assert.assertTrue((boolean)(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows));
        Assert.assertTrue((boolean)(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor));
        SingleOutputStreamOperator window2 = source.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS))).trigger((Trigger)CountTrigger.of((long)100L)).apply((WindowFunction)new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>(){
            private static final long serialVersionUID = 1L;

            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            }
        });
        OneInputTransformation transform2 = (OneInputTransformation)window2.getTransformation();
        OneInputStreamOperator operator2 = transform2.getOperator();
        Assert.assertTrue((boolean)(operator2 instanceof WindowOperator));
        WindowOperator winOperator2 = (WindowOperator)operator2;
        Assert.assertTrue((boolean)(winOperator2.getTrigger() instanceof CountTrigger));
        Assert.assertTrue((boolean)(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows));
        Assert.assertTrue((boolean)(winOperator2.getStateDescriptor() instanceof ListStateDescriptor));
    }

    @Test
    public void testEvicting() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource source = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"hello", (Object)1), Tuple2.of((Object)"hello", (Object)2)});
        DummyReducer reducer = new DummyReducer();
        SingleOutputStreamOperator window1 = source.keyBy(new int[]{0}).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS))).evictor((Evictor)CountEvictor.of((long)100L)).reduce((ReduceFunction)reducer);
        OneInputTransformation transform1 = (OneInputTransformation)window1.getTransformation();
        OneInputStreamOperator operator1 = transform1.getOperator();
        Assert.assertTrue((boolean)(operator1 instanceof EvictingWindowOperator));
        EvictingWindowOperator winOperator1 = (EvictingWindowOperator)operator1;
        Assert.assertTrue((boolean)(winOperator1.getTrigger() instanceof EventTimeTrigger));
        Assert.assertTrue((boolean)(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows));
        Assert.assertTrue((boolean)(winOperator1.getEvictor() instanceof CountEvictor));
        Assert.assertTrue((boolean)(winOperator1.getStateDescriptor() instanceof ListStateDescriptor));
        SingleOutputStreamOperator window2 = source.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS))).trigger((Trigger)CountTrigger.of((long)100L)).evictor((Evictor)TimeEvictor.of((Time)Time.of((long)100L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((WindowFunction)new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>(){
            private static final long serialVersionUID = 1L;

            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            }
        });
        OneInputTransformation transform2 = (OneInputTransformation)window2.getTransformation();
        OneInputStreamOperator operator2 = transform2.getOperator();
        Assert.assertTrue((boolean)(operator2 instanceof EvictingWindowOperator));
        EvictingWindowOperator winOperator2 = (EvictingWindowOperator)operator2;
        Assert.assertTrue((boolean)(winOperator2.getTrigger() instanceof CountTrigger));
        Assert.assertTrue((boolean)(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows));
        Assert.assertTrue((boolean)(winOperator2.getEvictor() instanceof TimeEvictor));
        Assert.assertTrue((boolean)(winOperator2.getStateDescriptor() instanceof ListStateDescriptor));
    }

    @Test
    public void testSessionWithFold() throws Exception {
        LocalStreamEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
        WindowedStream windowedStream = env.fromElements((Object[])new String[]{"Hello", "Ciao"}).keyBy((KeySelector)new KeySelector<String, String>(){

            public String getKey(String value) throws Exception {
                return value;
            }
        }).window((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)5L)));
        try {
            windowedStream.fold((Object)"", (FoldFunction)new FoldFunction<String, String>(){

                public String fold(String accumulator, String value) throws Exception {
                    return accumulator;
                }
            });
        }
        catch (UnsupportedOperationException e) {
            return;
        }
        Assert.fail((String)"The fold call should fail.");
    }

    @Test
    public void testMergingAssignerWithNonMergingTrigger() throws Exception {
        LocalStreamEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
        WindowedStream windowedStream = env.fromElements((Object[])new String[]{"Hello", "Ciao"}).keyBy((KeySelector)new KeySelector<String, String>(){

            public String getKey(String value) throws Exception {
                return value;
            }
        }).window((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)5L)));
        try {
            windowedStream.trigger((Trigger)new Trigger<String, TimeWindow>(){

                public TriggerResult onElement(String element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
                    return null;
                }

                public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
                    return null;
                }

                public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
                    return null;
                }

                public boolean canMerge() {
                    return false;
                }
            });
        }
        catch (UnsupportedOperationException e) {
            return;
        }
        Assert.fail((String)"The trigger call should fail.");
    }

    public static class DummyReducer
    implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
            return value1;
        }
    }
}

