package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.class */
public class WindowOperator<K, IN, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
    private final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final Trigger<? super IN, ? super W> trigger;
    private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
    private boolean setProcessingTime;
    private TypeSerializer<IN> inputSerializer;
    private final TypeSerializer<K> keySerializer;
    private final TypeSerializer<W> windowSerializer;
    private transient Map<Long, Set<WindowOperator<K, IN, OUT, W>.Context>> processingTimeTimers;
    private transient Map<Long, Set<WindowOperator<K, IN, OUT, W>.Context>> watermarkTimers;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient long currentWatermark;
    protected transient Map<K, Map<W, WindowOperator<K, IN, OUT, W>.Context>> windows;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Context.class */
    public class Context implements Trigger.TriggerContext {
        protected K key;
        protected W window;
        protected WindowBuffer<IN> windowBuffer;
        protected HashMap<String, Serializable> state;
        protected long watermarkTimer;
        protected long processingTimeTimer;

        public Context(K k, W w, WindowBuffer<IN> windowBuffer) {
            this.key = k;
            this.window = w;
            this.windowBuffer = windowBuffer;
            this.state = new HashMap<>();
            this.watermarkTimer = -1L;
            this.processingTimeTimer = -1L;
        }

        protected Context(DataInputView dataInputView, ClassLoader classLoader) throws Exception {
            this.key = (K) WindowOperator.this.keySerializer.deserialize(dataInputView);
            this.window = (W) WindowOperator.this.windowSerializer.deserialize(dataInputView);
            this.watermarkTimer = dataInputView.readLong();
            this.processingTimeTimer = dataInputView.readLong();
            byte[] bArr = new byte[dataInputView.readInt()];
            dataInputView.read(bArr);
            this.state = (HashMap) InstantiationUtil.deserializeObject(bArr, classLoader);
            this.windowBuffer = WindowOperator.this.windowBufferFactory.create();
            int readInt = dataInputView.readInt();
            MultiplexingStreamRecordSerializer multiplexingStreamRecordSerializer = new MultiplexingStreamRecordSerializer(WindowOperator.this.inputSerializer);
            for (int i = 0; i < readInt; i++) {
                this.windowBuffer.storeElement(multiplexingStreamRecordSerializer.m282deserialize(dataInputView).asRecord());
            }
        }

        protected void writeToState(StateBackend.CheckpointStateOutputView checkpointStateOutputView) throws IOException {
            WindowOperator.this.keySerializer.serialize(this.key, checkpointStateOutputView);
            WindowOperator.this.windowSerializer.serialize(this.window, checkpointStateOutputView);
            checkpointStateOutputView.writeLong(this.watermarkTimer);
            checkpointStateOutputView.writeLong(this.processingTimeTimer);
            byte[] serializeObject = InstantiationUtil.serializeObject(this.state);
            checkpointStateOutputView.writeInt(serializeObject.length);
            checkpointStateOutputView.write(serializeObject, 0, serializeObject.length);
            MultiplexingStreamRecordSerializer multiplexingStreamRecordSerializer = new MultiplexingStreamRecordSerializer(WindowOperator.this.inputSerializer);
            checkpointStateOutputView.writeInt(this.windowBuffer.size());
            Iterator<StreamRecord<IN>> it = this.windowBuffer.getElements().iterator();
            while (it.hasNext()) {
                multiplexingStreamRecordSerializer.serialize((StreamElement) it.next(), (DataOutputView) checkpointStateOutputView);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> OperatorState<S> getKeyValueState(final String str, final S s) {
            return (OperatorState<S>) new OperatorState<S>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.Context.1
                /* JADX WARN: Incorrect return type in method signature: ()TS; */
                /* renamed from: value, reason: merged with bridge method [inline-methods] */
                public Serializable m281value() throws IOException {
                    Serializable serializable = Context.this.state.get(str);
                    if (serializable == null) {
                        Context.this.state.put(str, s);
                        serializable = s;
                    }
                    return serializable;
                }

                /* JADX WARN: Incorrect types in method signature: (TS;)V */
                public void update(Serializable serializable) throws IOException {
                    Context.this.state.put(str, serializable);
                }
            };
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            if (this.processingTimeTimer == j) {
                return;
            }
            Set set = (Set) WindowOperator.this.processingTimeTimers.get(Long.valueOf(j));
            if (set == null) {
                WindowOperator.this.getRuntimeContext().registerTimer(j, WindowOperator.this);
                set = new HashSet();
                WindowOperator.this.processingTimeTimers.put(Long.valueOf(j), set);
            }
            this.processingTimeTimer = j;
            set.add(this);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            if (this.watermarkTimer == j) {
                return;
            }
            Set set = (Set) WindowOperator.this.watermarkTimers.get(Long.valueOf(j));
            if (set == null) {
                set = new HashSet();
                WindowOperator.this.watermarkTimers.put(Long.valueOf(j), set);
            }
            this.watermarkTimer = j;
            set.add(this);
        }

        public Trigger.TriggerResult onElement(StreamRecord<IN> streamRecord) throws Exception {
            Trigger.TriggerResult onElement = WindowOperator.this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), this.window, this);
            return (this.watermarkTimer <= 0 || this.watermarkTimer > WindowOperator.this.currentWatermark) ? onElement : Trigger.TriggerResult.merge(onElement, onEventTime(this.watermarkTimer));
        }

        public Trigger.TriggerResult onProcessingTime(long j) throws Exception {
            if (j != this.processingTimeTimer) {
                return Trigger.TriggerResult.CONTINUE;
            }
            this.processingTimeTimer = -1L;
            return WindowOperator.this.trigger.onProcessingTime(j, this.window, this);
        }

        public Trigger.TriggerResult onEventTime(long j) throws Exception {
            if (j != this.watermarkTimer) {
                return Trigger.TriggerResult.CONTINUE;
            }
            this.watermarkTimer = -1L;
            Trigger.TriggerResult onEventTime = WindowOperator.this.trigger.onEventTime(j, this.window, this);
            return (this.watermarkTimer <= 0 || this.watermarkTimer > WindowOperator.this.currentWatermark) ? onEventTime : Trigger.TriggerResult.merge(onEventTime, onEventTime(this.watermarkTimer));
        }
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, WindowFunction<IN, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger) {
        super(windowFunction);
        this.setProcessingTime = false;
        this.currentWatermark = -1L;
        this.windowAssigner = (WindowAssigner) Objects.requireNonNull(windowAssigner);
        this.windowSerializer = typeSerializer;
        this.keySelector = (KeySelector) Objects.requireNonNull(keySelector);
        this.keySerializer = (TypeSerializer) Objects.requireNonNull(typeSerializer2);
        this.windowBufferFactory = (WindowBufferFactory) Objects.requireNonNull(windowBufferFactory);
        this.trigger = (Trigger) Objects.requireNonNull(trigger);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        this.inputSerializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector<>(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        this.windowBufferFactory.setRuntimeContext(getRuntimeContext());
        this.windowBufferFactory.open(getUserFunctionParameters());
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashMap();
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashMap();
        }
        if (this.windows == null) {
            this.windows = new HashMap();
        }
        Iterator<Map.Entry<K, Map<W, WindowOperator<K, IN, OUT, W>.Context>>> it = this.windows.entrySet().iterator();
        while (it.hasNext()) {
            for (WindowOperator<K, IN, OUT, W>.Context context : it.next().getValue().values()) {
                if (context.processingTimeTimer > 0) {
                    Set<WindowOperator<K, IN, OUT, W>.Context> set = this.processingTimeTimers.get(Long.valueOf(context.processingTimeTimer));
                    if (set == null) {
                        getRuntimeContext().registerTimer(context.processingTimeTimer, this);
                        set = new HashSet();
                        this.processingTimeTimers.put(Long.valueOf(context.processingTimeTimer), set);
                    }
                    set.add(context);
                }
                if (context.watermarkTimer > 0) {
                    Set<WindowOperator<K, IN, OUT, W>.Context> set2 = this.watermarkTimers.get(Long.valueOf(context.watermarkTimer));
                    if (set2 == null) {
                        set2 = new HashSet();
                        this.watermarkTimers.put(Long.valueOf(context.watermarkTimer), set2);
                    }
                    set2.add(context);
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void close() throws Exception {
        super.close();
        Iterator<Map.Entry<K, Map<W, WindowOperator<K, IN, OUT, W>.Context>>> it = this.windows.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<WindowOperator<K, IN, OUT, W>.Context> it2 = it.next().getValue().values().iterator();
            while (it2.hasNext()) {
                emitWindow(it2.next());
            }
        }
        this.windows.clear();
        this.windowBufferFactory.close();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public final void processElement(StreamRecord<IN> streamRecord) throws Exception {
        if (this.setProcessingTime) {
            streamRecord.replace(streamRecord.getValue(), System.currentTimeMillis());
        }
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp());
        Object key = this.keySelector.getKey(streamRecord.getValue());
        Map<W, WindowOperator<K, IN, OUT, W>.Context> map = this.windows.get(key);
        if (map == null) {
            map = new HashMap();
            this.windows.put(key, map);
        }
        for (W w : assignWindows) {
            WindowOperator<K, IN, OUT, W>.Context context = map.get(w);
            if (context == null) {
                context = new Context(key, w, this.windowBufferFactory.create());
                map.put(w, context);
            }
            context.windowBuffer.storeElement(streamRecord);
            processTriggerResult(context.onElement(streamRecord), key, w);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v1, types: [W extends org.apache.flink.streaming.api.windowing.windows.Window, org.apache.flink.streaming.api.windowing.windows.Window] */
    protected void emitWindow(WindowOperator<K, IN, OUT, W>.Context context) throws Exception {
        this.timestampedCollector.setTimestamp(context.window.maxTimestamp());
        if (context.windowBuffer.size() > 0) {
            setKeyContextElement(context.windowBuffer.getElements().iterator().next());
            ((WindowFunction) this.userFunction).apply(context.key, context.window, context.windowBuffer.getUnpackedElements(), this.timestampedCollector);
        }
    }

    private void processTriggerResult(Trigger.TriggerResult triggerResult, K k, W w) throws Exception {
        WindowOperator<K, IN, OUT, W>.Context context;
        if (triggerResult.isFire() || triggerResult.isPurge()) {
            Map<W, WindowOperator<K, IN, OUT, W>.Context> map = this.windows.get(k);
            if (map == null) {
                LOG.debug("Window {} for key {} already gone.", w, k);
                return;
            }
            if (triggerResult.isPurge()) {
                context = map.remove(w);
                if (map.isEmpty()) {
                    this.windows.remove(k);
                }
            } else {
                context = map.get(w);
            }
            if (context == null) {
                LOG.debug("Window {} for key {} already gone.", w, k);
            } else if (triggerResult.isFire()) {
                emitWindow(context);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public final void processWatermark(Watermark watermark) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, Set<WindowOperator<K, IN, OUT, W>.Context>>> it = this.watermarkTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<WindowOperator<K, IN, OUT, W>.Context>> next = it.next();
            if (next.getKey().longValue() <= watermark.getTimestamp()) {
                arrayList.add(next.getValue());
                it.remove();
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            for (Context context : (Set) it2.next()) {
                if (context.watermarkTimer <= watermark.getTimestamp()) {
                    processTriggerResult(context.onEventTime(context.watermarkTimer), context.key, context.window);
                }
            }
        }
        this.output.emitWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    @Override // org.apache.flink.streaming.runtime.operators.Triggerable
    public final void trigger(long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, Set<WindowOperator<K, IN, OUT, W>.Context>>> it = this.processingTimeTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<WindowOperator<K, IN, OUT, W>.Context>> next = it.next();
            if (next.getKey().longValue() <= j) {
                arrayList.add(next.getValue());
                it.remove();
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            for (Context context : (Set) it2.next()) {
                if (context.processingTimeTimer <= j) {
                    processTriggerResult(context.onProcessingTime(context.processingTimeTimer), context.key, context.window);
                }
            }
        }
    }

    public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean z) {
        this.setProcessingTime = z;
        return this;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        StateBackend.CheckpointStateOutputView createCheckpointStateOutputView = getStateBackend().createCheckpointStateOutputView(j, j2);
        createCheckpointStateOutputView.writeInt(this.windows.size());
        for (Map.Entry<K, Map<W, WindowOperator<K, IN, OUT, W>.Context>> entry : this.windows.entrySet()) {
            createCheckpointStateOutputView.writeInt(entry.getValue().size());
            Iterator<WindowOperator<K, IN, OUT, W>.Context> it = entry.getValue().values().iterator();
            while (it.hasNext()) {
                it.next().writeToState(createCheckpointStateOutputView);
            }
        }
        snapshotOperatorState.setOperatorState(createCheckpointStateOutputView.closeAndGetHandle());
        return snapshotOperatorState;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        super.restoreState(streamTaskState);
        ClassLoader userCodeClassloader = getUserCodeClassloader();
        DataInputView dataInputView = (DataInputView) streamTaskState.getOperatorState().getState(userCodeClassloader);
        int readInt = dataInputView.readInt();
        this.windows = new HashMap(readInt);
        this.processingTimeTimers = new HashMap();
        this.watermarkTimers = new HashMap();
        for (int i = 0; i < readInt; i++) {
            int readInt2 = dataInputView.readInt();
            for (int i2 = 0; i2 < readInt2; i2++) {
                WindowOperator<K, IN, OUT, W>.Context context = new Context(dataInputView, userCodeClassloader);
                Map<W, WindowOperator<K, IN, OUT, W>.Context> map = this.windows.get(context.key);
                if (map == null) {
                    map = new HashMap(readInt2);
                    this.windows.put(context.key, map);
                }
                map.put(context.window, context);
            }
        }
    }

    @VisibleForTesting
    public boolean isSetProcessingTime() {
        return this.setProcessingTime;
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
        return this.windowBufferFactory;
    }
}
