package org.apache.flink.streaming.runtime.operators.windowing;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.WindowingTestHarness;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.class */
public class WindowingTestHarnessTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest$TupleKeySelector.class */
    private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1;

        private TupleKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
            return (String) tuple2.f0;
        }
    }

    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowingTestHarness windowingTestHarness = new WindowingTestHarness(new ExecutionConfig(), TumblingEventTimeWindows.of(Time.milliseconds(2000L)), BasicTypeInfo.STRING_TYPE_INFO, parse, new TupleKeySelector(), EventTimeTrigger.create(), 0L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 1000L);
        windowingTestHarness.processWatermark(1985L);
        windowingTestHarness.addExpectedWatermark(1985L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 1980L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 1998L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 2001L);
        windowingTestHarness.processWatermark(2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 1999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 1999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 1999L);
        windowingTestHarness.addExpectedWatermark(2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 3999L);
        windowingTestHarness.processWatermark(3999L);
        windowingTestHarness.addExpectedWatermark(3999L);
        windowingTestHarness.compareActualToExpectedOutput("Output is not correct");
        windowingTestHarness.close();
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowingTestHarness windowingTestHarness = new WindowingTestHarness(new ExecutionConfig(), TumblingProcessingTimeWindows.of(Time.milliseconds(3000L)), BasicTypeInfo.STRING_TYPE_INFO, parse, new TupleKeySelector(), ProcessingTimeTrigger.create(), 0L);
        windowingTestHarness.setProcessingTime(3L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), Long.MAX_VALUE);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 7000L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 7000L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 7000L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 7000L);
        windowingTestHarness.setProcessingTime(5000L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 2999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.processElement(new Tuple2("key1", 1), 7000L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 7000L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 7000L);
        windowingTestHarness.setProcessingTime(7000L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 5999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 5999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 5999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.close();
    }

    @Test
    public void testSnapshotingAndRecovery() throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<String, Integer>");
        WindowingTestHarness windowingTestHarness = new WindowingTestHarness(new ExecutionConfig(), TumblingEventTimeWindows.of(Time.milliseconds(3000L)), BasicTypeInfo.STRING_TYPE_INFO, parse, new TupleKeySelector(), EventTimeTrigger.create(), 0L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 3999L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 3000L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 20L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 0L);
        windowingTestHarness.processElement(new Tuple2("key1", 1), 999L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 1998L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 1999L);
        windowingTestHarness.processElement(new Tuple2("key2", 1), 1000L);
        windowingTestHarness.processWatermark(999L);
        windowingTestHarness.addExpectedWatermark(999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.processWatermark(1999L);
        windowingTestHarness.addExpectedWatermark(1999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        StreamTaskState snapshot = windowingTestHarness.snapshot(0L, 0L);
        windowingTestHarness.close();
        windowingTestHarness.restore(snapshot, 10L);
        windowingTestHarness.processWatermark(2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key1", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 2999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 2999L);
        windowingTestHarness.addExpectedWatermark(2999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.processWatermark(3999L);
        windowingTestHarness.addExpectedWatermark(3999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.processWatermark(4999L);
        windowingTestHarness.addExpectedWatermark(4999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.processWatermark(5999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 5999L);
        windowingTestHarness.addExpectedElement(new Tuple2("key2", 1), 5999L);
        windowingTestHarness.addExpectedWatermark(5999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
        windowingTestHarness.processWatermark(6999L);
        windowingTestHarness.processWatermark(7999L);
        windowingTestHarness.addExpectedWatermark(6999L);
        windowingTestHarness.addExpectedWatermark(7999L);
        windowingTestHarness.compareActualToExpectedOutput("Output was not correct.");
    }
}
