package org.apache.flink.streaming.util;

import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/util/WindowingTestHarness.class */
public class WindowingTestHarness<K, IN, W extends Window> {
    private final TestTimeServiceProvider timeServiceProvider;
    private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
    private final ConcurrentLinkedQueue<Object> expectedOutputs = new ConcurrentLinkedQueue<>();
    private volatile boolean isOpen = false;

    /* loaded from: input_file:org/apache/flink/streaming/util/WindowingTestHarness$PassThroughFunction.class */
    private class PassThroughFunction implements WindowFunction<IN, IN, K, W> {
        private static final long serialVersionUID = 1;

        private PassThroughFunction() {
        }

        public void apply(K k, W w, Iterable<IN> iterable, Collector<IN> collector) throws Exception {
            Iterator<IN> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/util/WindowingTestHarness$StreamRecordComparator.class */
    private class StreamRecordComparator implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((String) ((Tuple2) streamRecord.getValue()).f0).compareTo((String) ((Tuple2) streamRecord2.getValue()).f0);
            return compareTo != 0 ? compareTo : ((Integer) ((Tuple2) streamRecord.getValue()).f1).intValue() - ((Integer) ((Tuple2) streamRecord2.getValue()).f1).intValue();
        }
    }

    public WindowingTestHarness(ExecutionConfig executionConfig, WindowAssigner<? super IN, W> windowAssigner, TypeInformation<K> typeInformation, TypeInformation<IN> typeInformation2, KeySelector<IN, K> keySelector, Trigger<? super IN, ? super W> trigger, long j) {
        WindowOperator windowOperator = new WindowOperator(windowAssigner, windowAssigner.getWindowSerializer(executionConfig), keySelector, typeInformation.createSerializer(executionConfig), new ListStateDescriptor("window-contents", typeInformation2.createSerializer(executionConfig)), new InternalIterableWindowFunction(new PassThroughFunction()), trigger, j);
        windowOperator.setInputType(typeInformation2, executionConfig);
        this.timeServiceProvider = new TestTimeServiceProvider();
        this.testHarness = new OneInputStreamOperatorTestHarness<>(windowOperator, executionConfig, this.timeServiceProvider);
        this.testHarness.configureForKeyedStream(keySelector, typeInformation);
    }

    public void processElement(IN in, long j) throws Exception {
        openOperator();
        this.testHarness.processElement(new StreamRecord<>(in, j));
    }

    public void processWatermark(long j) throws Exception {
        openOperator();
        this.testHarness.processWatermark(new Watermark(j));
    }

    public void setProcessingTime(long j) throws Exception {
        openOperator();
        this.timeServiceProvider.setCurrentTime(j);
    }

    public ConcurrentLinkedQueue<Object> getOutput() throws Exception {
        return this.testHarness.getOutput();
    }

    public void close() throws Exception {
        if (this.isOpen) {
            this.testHarness.close();
            this.isOpen = false;
        }
    }

    public void addExpectedWatermark(long j) {
        this.expectedOutputs.add(new Watermark(j));
    }

    public void addExpectedElement(IN in, long j) {
        this.expectedOutputs.add(new StreamRecord(in, j));
    }

    public void compareActualToExpectedOutput(String str) {
        TestHarnessUtil.assertOutputEqualsSorted(str, this.expectedOutputs, this.testHarness.getOutput(), new StreamRecordComparator());
    }

    public StreamTaskState snapshot(long j, long j2) throws Exception {
        return this.testHarness.snapshot(j, j2);
    }

    public void restore(StreamTaskState streamTaskState, long j) throws Exception {
        Preconditions.checkArgument(!this.isOpen, "You are trying to restore() while the operator is still open. Please call close() first.");
        this.testHarness.setup();
        this.testHarness.restore(streamTaskState, j);
        openOperator();
    }

    private void openOperator() throws Exception {
        if (this.isOpen) {
            return;
        }
        this.testHarness.open();
        this.isOpen = true;
    }
}
