/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.WindowingTestHarness;
import org.junit.Test;

public class WindowingTestHarnessTest {
    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        int WINDOW_SIZE = 2000;
        TypeInformation inputType = TypeInfoParser.parse((String)"Tuple2<String, Integer>");
        TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of((Time)Time.milliseconds((long)2000L));
        WindowingTestHarness testHarness = new WindowingTestHarness(new ExecutionConfig(), windowAssigner, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation<Tuple2<String, Integer>>)inputType, new TupleKeySelector(), EventTimeTrigger.create(), 0L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1000L);
        testHarness.processWatermark(1985L);
        testHarness.addExpectedWatermark(1985L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1980L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1998L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2001L);
        testHarness.processWatermark(2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1999L);
        testHarness.addExpectedWatermark(2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 3999L);
        testHarness.processWatermark(3999L);
        testHarness.addExpectedWatermark(3999L);
        testHarness.compareActualToExpectedOutput("Output is not correct");
        testHarness.close();
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        int WINDOW_SIZE = 3000;
        TypeInformation inputType = TypeInfoParser.parse((String)"Tuple2<String, Integer>");
        TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of((Time)Time.milliseconds((long)3000L));
        WindowingTestHarness testHarness = new WindowingTestHarness(new ExecutionConfig(), windowAssigner, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation<Tuple2<String, Integer>>)inputType, new TupleKeySelector(), ProcessingTimeTrigger.create(), 0L);
        testHarness.setProcessingTime(3L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), Long.MAX_VALUE);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 7000L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 7000L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 7000L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 7000L);
        testHarness.setProcessingTime(5000L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 2999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 7000L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 7000L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 7000L);
        testHarness.setProcessingTime(7000L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 5999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 5999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 5999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.close();
    }

    @Test
    public void testSnapshotingAndRecovery() throws Exception {
        int WINDOW_SIZE = 3000;
        TypeInformation inputType = TypeInfoParser.parse((String)"Tuple2<String, Integer>");
        TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of((Time)Time.milliseconds((long)3000L));
        WindowingTestHarness testHarness = new WindowingTestHarness(new ExecutionConfig(), windowAssigner, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation<Tuple2<String, Integer>>)inputType, new TupleKeySelector(), EventTimeTrigger.create(), 0L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 3999L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 3000L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 20L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 0L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 999L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1998L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1999L);
        testHarness.processElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 1000L);
        testHarness.processWatermark(999L);
        testHarness.addExpectedWatermark(999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.processWatermark(1999L);
        testHarness.addExpectedWatermark(1999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness.restore(snapshot, 10L);
        testHarness.processWatermark(2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key1", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 2999L);
        testHarness.addExpectedWatermark(2999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.processWatermark(3999L);
        testHarness.addExpectedWatermark(3999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.processWatermark(4999L);
        testHarness.addExpectedWatermark(4999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.processWatermark(5999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 5999L);
        testHarness.addExpectedElement((Tuple2<String, Integer>)new Tuple2((Object)"key2", (Object)1), 5999L);
        testHarness.addExpectedWatermark(5999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
        testHarness.processWatermark(6999L);
        testHarness.processWatermark(7999L);
        testHarness.addExpectedWatermark(6999L);
        testHarness.addExpectedWatermark(7999L);
        testHarness.compareActualToExpectedOutput("Output was not correct.");
    }

    private static class TupleKeySelector
    implements KeySelector<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1L;

        private TupleKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return (String)value.f0;
        }
    }
}

