package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.class */
public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    protected final KeySelector<IN, K> keySelector;
    protected final Trigger<? super IN, ? super W> trigger;
    protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor;
    protected TypeSerializer<IN> inputSerializer;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient long currentWatermark;
    protected transient WindowOperator<K, IN, ACC, OUT, W>.Context context;
    protected transient Set<Timer<K, W>> processingTimeTimers;
    protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
    protected transient Set<Timer<K, W>> watermarkTimers;
    protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Context.class */
    public class Context implements Trigger.TriggerContext {
        protected K key;
        protected W window;

        public Context(K k, W w) {
            this.key = k;
            this.window = w;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return WindowOperator.this.currentWatermark;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, Class<S> cls, S s) {
            Objects.requireNonNull(cls, "The state type class must not be null");
            try {
                return getKeyValueState(str, (TypeInformation<TypeInformation<S>>) TypeExtractor.getForClass(cls), (TypeInformation<S>) s);
            } catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + cls.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, TypeInformation<S> typeInformation, S s) {
            Objects.requireNonNull(str, "The name of the state must not be null");
            Objects.requireNonNull(typeInformation, "The state type information must not be null");
            return getPartitionedState(new ValueStateDescriptor(str, typeInformation.createSerializer(WindowOperator.this.getExecutionConfig()), s));
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            Timer<K, W> timer = new Timer<>(j, this.key, this.window);
            if (WindowOperator.this.processingTimeTimers.add(timer)) {
                WindowOperator.this.processingTimeTimersQueue.add(timer);
                WindowOperator.this.getRuntimeContext().registerTimer(j, WindowOperator.this);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            Timer<K, W> timer = new Timer<>(j, this.key, this.window);
            if (WindowOperator.this.watermarkTimers.add(timer)) {
                WindowOperator.this.watermarkTimersQueue.add(timer);
            }
            if (j <= WindowOperator.this.currentWatermark) {
                WindowOperator.this.getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            Timer timer = new Timer(j, this.key, this.window);
            if (WindowOperator.this.processingTimeTimers.remove(timer)) {
                WindowOperator.this.processingTimeTimersQueue.remove(timer);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            Timer timer = new Timer(j, this.key, this.window);
            if (WindowOperator.this.watermarkTimers.remove(timer)) {
                WindowOperator.this.watermarkTimersQueue.remove(timer);
            }
        }

        public TriggerResult onElement(StreamRecord<IN> streamRecord) throws Exception {
            return WindowOperator.this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long j) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(j, this.window, this);
        }

        public TriggerResult onEventTime(long j) throws Exception {
            return WindowOperator.this.trigger.onEventTime(j, this.window, this);
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Timer.class */
    public static class Timer<K, W extends Window> implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long j, K k, W w) {
            this.timestamp = j;
            this.key = k;
            this.window = w;
        }

        @Override // java.lang.Comparable
        public int compareTo(Timer<K, W> timer) {
            return Long.compare(this.timestamp, timer.timestamp);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Timer timer = (Timer) obj;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            return (31 * ((31 * ((int) (this.timestamp ^ (this.timestamp >>> 32)))) + this.key.hashCode())) + this.window.hashCode();
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends MergingState<IN, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger) {
        super(internalWindowFunction);
        this.currentWatermark = -1L;
        this.context = new Context(null, null);
        this.windowAssigner = (WindowAssigner) Objects.requireNonNull(windowAssigner);
        this.windowSerializer = typeSerializer;
        this.keySelector = (KeySelector) Objects.requireNonNull(keySelector);
        this.keySerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer2);
        this.windowStateDescriptor = stateDescriptor;
        this.trigger = (Trigger) Objects.requireNonNull(trigger);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        this.inputSerializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector<>(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashSet();
            this.watermarkTimersQueue = new PriorityQueue<>(100);
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashSet();
            this.processingTimeTimersQueue = new PriorityQueue<>(100);
        }
        this.context = new Context(null, null);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void close() throws Exception {
        super.close();
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp());
        K k = (K) getStateBackend().getCurrentKey();
        for (W w : assignWindows) {
            getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor).add(streamRecord.getValue());
            this.context.key = k;
            this.context.window = w;
            processTriggerResult(this.context.onElement(streamRecord), k, w);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void processTriggerResult(TriggerResult triggerResult, K k, W w) throws Exception {
        if (triggerResult.isFire() || triggerResult.isPurge()) {
            if (!triggerResult.isFire()) {
                if (triggerResult.isPurge()) {
                    getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor).clear();
                    this.context.clear();
                    return;
                }
                return;
            }
            this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
            MergingState partitionedState = getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor);
            ((InternalWindowFunction) this.userFunction).apply(this.context.key, this.context.window, partitionedState.get(), this.timestampedCollector);
            if (triggerResult.isPurge()) {
                partitionedState.clear();
                this.context.clear();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public final void processWatermark(Watermark watermark) throws Exception {
        boolean z;
        do {
            Timer<K, W> peek = this.watermarkTimersQueue.peek();
            if (peek == null || peek.timestamp > watermark.getTimestamp()) {
                z = false;
            } else {
                z = true;
                this.watermarkTimers.remove(peek);
                this.watermarkTimersQueue.remove();
                this.context.key = peek.key;
                this.context.window = peek.window;
                setKeyContext(peek.key);
                processTriggerResult(this.context.onEventTime(peek.timestamp), this.context.key, this.context.window);
            }
        } while (z);
        this.output.emitWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    @Override // org.apache.flink.streaming.runtime.operators.Triggerable
    public final void trigger(long j) throws Exception {
        boolean z;
        do {
            Timer<K, W> peek = this.processingTimeTimersQueue.peek();
            if (peek == null || peek.timestamp > j) {
                z = false;
            } else {
                z = true;
                this.processingTimeTimers.remove(peek);
                this.processingTimeTimersQueue.remove();
                this.context.key = peek.key;
                this.context.window = peek.window;
                setKeyContext(peek.key);
                processTriggerResult(this.context.onProcessingTime(peek.timestamp), this.context.key, this.context.window);
            }
        } while (z);
        processWatermark(new Watermark(this.currentWatermark));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        AbstractStateBackend.CheckpointStateOutputView createCheckpointStateOutputView = getStateBackend().createCheckpointStateOutputView(j, j2);
        createCheckpointStateOutputView.writeInt(this.watermarkTimersQueue.size());
        Iterator<Timer<K, W>> it = this.watermarkTimersQueue.iterator();
        while (it.hasNext()) {
            Timer<K, W> next = it.next();
            this.keySerializer.serialize(next.key, createCheckpointStateOutputView);
            this.windowSerializer.serialize(next.window, createCheckpointStateOutputView);
            createCheckpointStateOutputView.writeLong(next.timestamp);
        }
        createCheckpointStateOutputView.writeInt(this.processingTimeTimers.size());
        Iterator<Timer<K, W>> it2 = this.processingTimeTimersQueue.iterator();
        while (it2.hasNext()) {
            Timer<K, W> next2 = it2.next();
            this.keySerializer.serialize(next2.key, createCheckpointStateOutputView);
            this.windowSerializer.serialize(next2.window, createCheckpointStateOutputView);
            createCheckpointStateOutputView.writeLong(next2.timestamp);
        }
        snapshotOperatorState.setOperatorState(createCheckpointStateOutputView.closeAndGetHandle());
        return snapshotOperatorState;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState, long j) throws Exception {
        super.restoreState(streamTaskState, j);
        DataInputView dataInputView = (DataInputView) streamTaskState.getOperatorState().getState(getUserCodeClassloader());
        int readInt = dataInputView.readInt();
        this.watermarkTimers = new HashSet(readInt);
        this.watermarkTimersQueue = new PriorityQueue<>(Math.max(readInt, 1));
        for (int i = 0; i < readInt; i++) {
            Timer<K, W> timer = new Timer<>(dataInputView.readLong(), this.keySerializer.deserialize(dataInputView), (Window) this.windowSerializer.deserialize(dataInputView));
            this.watermarkTimers.add(timer);
            this.watermarkTimersQueue.add(timer);
        }
        int readInt2 = dataInputView.readInt();
        this.processingTimeTimers = new HashSet(readInt2);
        this.processingTimeTimersQueue = new PriorityQueue<>(Math.max(readInt2, 1));
        for (int i2 = 0; i2 < readInt2; i2++) {
            Timer<K, W> timer2 = new Timer<>(dataInputView.readLong(), this.keySerializer.deserialize(dataInputView), (Window) this.windowSerializer.deserialize(dataInputView));
            this.processingTimeTimers.add(timer2);
            this.processingTimeTimersQueue.add(timer2);
        }
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends MergingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
