package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.class */
public class WindowOperatorMigrationTest {
    private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorMigrationTest.1
    });
    private final MigrationVersion flinkGenerateSavepointVersion = null;
    private final MigrationVersion testMigrateVersion;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest$RichSumReducer.class */
    private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1;
        private boolean openCalled;

        private RichSumReducer() {
            this.openCalled = false;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
        }

        public void apply(String str, W w, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            collector.collect(new Tuple2(str, Integer.valueOf(i)));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (String) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest$SessionWindowFunction.class */
    private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private SessionWindowFunction() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            collector.collect(new Tuple3(str + "-" + i, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd())));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest$SumReducer.class */
    private static class SumReducer<K> implements ReduceFunction<Tuple2<K, Integer>> {
        private static final long serialVersionUID = 1;

        private SumReducer() {
        }

        public Tuple2<K, Integer> reduce(Tuple2<K, Integer> tuple2, Tuple2<K, Integer> tuple22) throws Exception {
            return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest$Tuple2ResultSortComparator.class */
    private static class Tuple2ResultSortComparator<K extends Comparable> implements Comparator<Object> {
        private Tuple2ResultSortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((Comparable) ((Tuple2) streamRecord.getValue()).f0).compareTo(((Tuple2) streamRecord2.getValue()).f0);
            return compareTo != 0 ? compareTo : ((Integer) ((Tuple2) streamRecord.getValue()).f1).intValue() - ((Integer) ((Tuple2) streamRecord2.getValue()).f1).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest$Tuple3ResultSortComparator.class */
    private static class Tuple3ResultSortComparator implements Comparator<Object> {
        private Tuple3ResultSortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((String) ((Tuple3) streamRecord.getValue()).f0).compareTo((String) ((Tuple3) streamRecord2.getValue()).f0);
            if (compareTo != 0) {
                return compareTo;
            }
            int longValue = (int) (((Long) ((Tuple3) streamRecord.getValue()).f1).longValue() - ((Long) ((Tuple3) streamRecord2.getValue()).f1).longValue());
            return longValue != 0 ? longValue : (int) (((Long) ((Tuple3) streamRecord.getValue()).f1).longValue() - ((Long) ((Tuple3) streamRecord2.getValue()).f1).longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest$TupleKeySelector.class */
    private static class TupleKeySelector<K> implements KeySelector<Tuple2<K, Integer>, K> {
        private static final long serialVersionUID = 1;

        private TupleKeySelector() {
        }

        public K getKey(Tuple2<K, Integer> tuple2) throws Exception {
            return (K) tuple2.f0;
        }
    }

    @Parameterized.Parameters(name = "Migration Savepoint: {0}")
    public static Collection<MigrationVersion> parameters() {
        return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3, MigrationVersion.v1_4, MigrationVersion.v1_5, MigrationVersion.v1_6, MigrationVersion.v1_7);
    }

    public WindowOperatorMigrationTest(MigrationVersion migrationVersion) {
        this.testMigrateVersion = migrationVersion;
    }

    @Test
    @Ignore
    public void writeSessionWindowsWithCountTriggerSnapshot() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 3500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreSessionWindowsWithCountTrigger() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-session-with-stateful-trigger-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 6000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 6500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 10), 4500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-22", 10L, 10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    @Ignore
    public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception {
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-session-with-stateful-trigger-mint-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 3500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 6000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 6500L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 7000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-10", 0L, 6500L), 6499L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 10), 4500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-22", 10L, 10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple3ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    @Ignore
    public void writeReducingEventTimeWindowsSnapshot() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-reduce-event-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreReducingEventTimeWindows() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-reduce-event-time-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    @Ignore
    public void writeApplyEventTimeWindowsSnapshot() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-apply-event-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreApplyEventTimeWindows() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-apply-event-time-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    @Ignore
    public void writeReducingProcessingTimeWindowsSnapshot() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(10L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1)));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(3010L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key3", 1)));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 1), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-reduce-processing-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreReducingProcessingTimeWindows() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-reduce-processing-time-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(3020L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3)));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(6000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), 5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key3", 1), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    @Ignore
    public void writeApplyProcessingTimeWindowsSnapshot() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(10L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1)));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(3010L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key3", 1)));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 1), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-apply-processing-time-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRestoreApplyProcessingTimeWindows() throws Exception {
        WindowOperator windowOperator = new WindowOperator(TumblingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-apply-processing-time-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(3020L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3)));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(6000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), 5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key3", 1), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorMigrationTest$2] */
    @Test
    @Ignore
    public void writeWindowsWithKryoSerializedKeysSnapshot() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("window-contents", new SumReducer(), new TypeHint<Tuple2<NonPojoType, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorMigrationTest.2
        }.getTypeInfo().createSerializer(new ExecutionConfig()));
        TypeSerializer createSerializer = TypeInformation.of(NonPojoType.class).createSerializer(new ExecutionConfig());
        Assert.assertTrue(createSerializer instanceof KryoSerializer);
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), createSerializer, reducingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), TypeInformation.of(NonPojoType.class));
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key2"), 1), 3999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key2"), 1), 3000L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key1"), 1), 20L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key1"), 1), 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key1"), 1), 999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key2"), 1), 1998L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key2"), 1), 1999L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(new NonPojoType("key2"), 1), 1000L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/win-op-migration-test-kryo-serialized-key-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
        keyedOneInputStreamOperatorTestHarness.close();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorMigrationTest$3] */
    @Test
    public void testRestoreKryoSerializedKeysWindows() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("window-contents", new SumReducer(), new TypeHint<Tuple2<NonPojoType, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorMigrationTest.3
        }.getTypeInfo().createSerializer(new ExecutionConfig()));
        TypeSerializer createSerializer = TypeInformation.of(NonPojoType.class).createSerializer(new ExecutionConfig());
        Assert.assertTrue(createSerializer instanceof KryoSerializer);
        WindowOperator windowOperator = new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), createSerializer, reducingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(windowOperator, new TupleKeySelector(), TypeInformation.of(NonPojoType.class));
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-kryo-serialized-key-flink" + this.testMigrateVersion + "-snapshot"));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2(new NonPojoType("key1"), 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2(new NonPojoType("key2"), 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2(new NonPojoType("key2"), 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput(), new Tuple2ResultSortComparator());
        keyedOneInputStreamOperatorTestHarness.close();
    }
}
