/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.NonPojoType;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class WindowOperatorMigrationTest {
    private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE = TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, Integer>>(){});
    private final MigrationVersion flinkGenerateSavepointVersion = null;
    private final MigrationVersion testMigrateVersion;

    @Parameterized.Parameters(name="Migration Savepoint: {0}")
    public static Collection<MigrationVersion> parameters() {
        return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3, MigrationVersion.v1_4, MigrationVersion.v1_5, MigrationVersion.v1_6, MigrationVersion.v1_7, MigrationVersion.v1_8);
    }

    public WindowOperatorMigrationTest(MigrationVersion testMigrateVersion) {
        this.testMigrateVersion = testMigrateVersion;
    }

    @Ignore
    @Test
    public void writeSessionWindowsWithCountTriggerSnapshot() throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 3500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreSessionWindowsWithCountTrigger() throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-session-with-stateful-trigger-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 6000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 6500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)10), 4500L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-22", (Object)10L, (Object)10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Ignore
    @Test
    public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-session-with-stateful-trigger-mint-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 3500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 6000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 6500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 7000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-10", (Object)0L, (Object)6500L), 6499L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)10), 4500L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-22", (Object)10L, (Object)10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Ignore
    @Test
    public void writeReducingEventTimeWindowsSnapshot() throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-reduce-event-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreReducingEventTimeWindows() throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-reduce-event-time-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Ignore
    @Test
    public void writeApplyEventTimeWindowsSnapshot() throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-apply-event-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreApplyEventTimeWindows() throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-apply-event-time-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Ignore
    @Test
    public void writeReducingProcessingTimeWindowsSnapshot() throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(10L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.setProcessingTime(3010L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1)));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-reduce-processing-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreReducingProcessingTimeWindows() throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-reduce-processing-time-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.setProcessingTime(3020L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3)));
        testHarness.setProcessingTime(6000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Ignore
    @Test
    public void writeApplyProcessingTimeWindowsSnapshot() throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(10L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.setProcessingTime(3010L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1)));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-apply-processing-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreApplyProcessingTimeWindows() throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-apply-processing-time-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.setProcessingTime(3020L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3)));
        testHarness.setProcessingTime(6000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Ignore
    @Test
    public void writeWindowsWithKryoSerializedKeysSnapshot() throws Exception {
        int windowSize = 3;
        TypeInformation inputType = new TypeHint<Tuple2<NonPojoType, Integer>>(){}.getTypeInfo();
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
        TypeSerializer keySerializer = TypeInformation.of(NonPojoType.class).createSerializer(new ExecutionConfig());
        Assert.assertTrue((boolean)(keySerializer instanceof KryoSerializer));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), keySerializer, (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), TypeInformation.of(NonPojoType.class));
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)1), 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)1), 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-kryo-serialized-key-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        testHarness.close();
    }

    @Test
    public void testRestoreKryoSerializedKeysWindows() throws Exception {
        int windowSize = 3;
        TypeInformation inputType = new TypeHint<Tuple2<NonPojoType, Integer>>(){}.getTypeInfo();
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig()));
        TypeSerializer keySerializer = TypeInformation.of(NonPojoType.class).createSerializer(new ExecutionConfig());
        Assert.assertTrue((boolean)(keySerializer instanceof KryoSerializer));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), keySerializer, (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), TypeInformation.of(NonPojoType.class));
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-kryo-serialized-key-flink" + this.testMigrateVersion + "-snapshot"));
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    private static class SessionWindowFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private SessionWindowFunction() {
        }

        public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
            int sum = 0;
            for (Tuple2<String, Integer> i : values) {
                sum += ((Integer)i.f1).intValue();
            }
            String resultString = key + "-" + sum;
            out.collect((Object)new Tuple3((Object)resultString, (Object)window.getStart(), (Object)window.getEnd()));
        }
    }

    private static class RichSumReducer<W extends Window>
    extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1L;
        private boolean openCalled = false;

        private RichSumReducer() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
        }

        public void apply(String key, W window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            int sum = 0;
            for (Tuple2<String, Integer> t : input) {
                sum += ((Integer)t.f1).intValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }
    }

    private static class SumReducer<K>
    implements ReduceFunction<Tuple2<K, Integer>> {
        private static final long serialVersionUID = 1L;

        private SumReducer() {
        }

        public Tuple2<K, Integer> reduce(Tuple2<K, Integer> value1, Tuple2<K, Integer> value2) throws Exception {
            return new Tuple2(value2.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }

    private static class Tuple3ResultSortComparator
    implements Comparator<Object> {
        private Tuple3ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((String)((Tuple3)sr0.getValue()).f0).compareTo((String)((Tuple3)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            comparison = (int)((Long)((Tuple3)sr0.getValue()).f1 - (Long)((Tuple3)sr1.getValue()).f1);
            if (comparison != 0) {
                return comparison;
            }
            return (int)((Long)((Tuple3)sr0.getValue()).f1 - (Long)((Tuple3)sr1.getValue()).f1);
        }
    }

    private static class Tuple2ResultSortComparator<K extends Comparable>
    implements Comparator<Object> {
        private Tuple2ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((Comparable)((Tuple2)sr0.getValue()).f0).compareTo(((Tuple2)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)((Tuple2)sr0.getValue()).f1 - (Integer)((Tuple2)sr1.getValue()).f1;
        }
    }

    private static class TupleKeySelector<K>
    implements KeySelector<Tuple2<K, Integer>, K> {
        private static final long serialVersionUID = 1L;

        private TupleKeySelector() {
        }

        public K getKey(Tuple2<K, Integer> value) throws Exception {
            return (K)value.f0;
        }
    }
}

