/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class WindowingTestHarness<K, IN, W extends Window> {
    private final TestTimeServiceProvider timeServiceProvider;
    private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
    private final ConcurrentLinkedQueue<Object> expectedOutputs = new ConcurrentLinkedQueue();
    private volatile boolean isOpen = false;

    public WindowingTestHarness(ExecutionConfig executionConfig, WindowAssigner<? super IN, W> windowAssigner, TypeInformation<K> keyType, TypeInformation<IN> inputType, KeySelector<IN, K> keySelector, Trigger<? super IN, ? super W> trigger, long allowedLateness) {
        ListStateDescriptor windowStateDesc = new ListStateDescriptor("window-contents", inputType.createSerializer(executionConfig));
        WindowOperator operator = new WindowOperator(windowAssigner, windowAssigner.getWindowSerializer(executionConfig), keySelector, keyType.createSerializer(executionConfig), (StateDescriptor)windowStateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new PassThroughFunction()), trigger, allowedLateness);
        operator.setInputType(inputType, executionConfig);
        this.timeServiceProvider = new TestTimeServiceProvider();
        this.testHarness = new OneInputStreamOperatorTestHarness(operator, executionConfig, (TimeServiceProvider)this.timeServiceProvider);
        this.testHarness.configureForKeyedStream(keySelector, keyType);
    }

    public void processElement(IN element, long timestamp) throws Exception {
        this.openOperator();
        this.testHarness.processElement(new StreamRecord(element, timestamp));
    }

    public void processWatermark(long timestamp) throws Exception {
        this.openOperator();
        this.testHarness.processWatermark(new Watermark(timestamp));
    }

    public void setProcessingTime(long timestamp) throws Exception {
        this.openOperator();
        this.timeServiceProvider.setCurrentTime(timestamp);
    }

    public ConcurrentLinkedQueue<Object> getOutput() throws Exception {
        return this.testHarness.getOutput();
    }

    public void close() throws Exception {
        if (this.isOpen) {
            this.testHarness.close();
            this.isOpen = false;
        }
    }

    public void addExpectedWatermark(long timestamp) {
        this.expectedOutputs.add(new Watermark(timestamp));
    }

    public void addExpectedElement(IN element, long timestamp) {
        this.expectedOutputs.add(new StreamRecord(element, timestamp));
    }

    public void compareActualToExpectedOutput(String errorMessage) {
        TestHarnessUtil.assertOutputEqualsSorted(errorMessage, this.expectedOutputs, this.testHarness.getOutput(), new StreamRecordComparator());
    }

    public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
        return this.testHarness.snapshot(checkpointId, timestamp);
    }

    public void restore(StreamTaskState snapshot, long recoveryTime) throws Exception {
        Preconditions.checkArgument((!this.isOpen ? 1 : 0) != 0, (Object)"You are trying to restore() while the operator is still open. Please call close() first.");
        this.testHarness.setup();
        this.testHarness.restore(snapshot, recoveryTime);
        this.openOperator();
    }

    private void openOperator() throws Exception {
        if (!this.isOpen) {
            this.testHarness.open();
            this.isOpen = true;
        }
    }

    private class StreamRecordComparator
    implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((String)((Tuple2)sr0.getValue()).f0).compareTo((String)((Tuple2)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)((Tuple2)sr0.getValue()).f1 - (Integer)((Tuple2)sr1.getValue()).f1;
        }
    }

    private class PassThroughFunction
    implements WindowFunction<IN, IN, K, W> {
        private static final long serialVersionUID = 1L;

        private PassThroughFunction() {
        }

        public void apply(K k, W window, Iterable<IN> input, Collector<IN> out) throws Exception {
            for (Object in : input) {
                out.collect(in);
            }
        }
    }
}

