package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.class */
public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
    private final WindowAssigner<? super IN, W> windowAssigner;
    private final Trigger<? super IN, ? super W> trigger;
    private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory;
    private TypeSerializer<IN> inputSerializer;
    private final TypeSerializer<W> windowSerializer;
    private transient Map<Long, Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context>> processingTimeTimers;
    private transient Map<Long, Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context>> watermarkTimers;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient long currentWatermark;
    protected transient Map<W, NonKeyedWindowOperator<IN, ACC, OUT, W>.Context> windows;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator$Context.class */
    public class Context implements Trigger.TriggerContext {
        protected W window;
        protected WindowBuffer<IN, ACC> windowBuffer;
        protected HashMap<String, Serializable> state;
        protected long watermarkTimer;
        protected long processingTimeTimer;

        public Context(W w, WindowBuffer<IN, ACC> windowBuffer) {
            this.window = w;
            this.windowBuffer = windowBuffer;
            this.state = new HashMap<>();
            this.watermarkTimer = -1L;
            this.processingTimeTimer = -1L;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return NonKeyedWindowOperator.this.currentWatermark;
        }

        protected Context(DataInputView dataInputView, ClassLoader classLoader) throws Exception {
            this.window = (W) NonKeyedWindowOperator.this.windowSerializer.deserialize(dataInputView);
            this.watermarkTimer = dataInputView.readLong();
            this.processingTimeTimer = dataInputView.readLong();
            byte[] bArr = new byte[dataInputView.readInt()];
            dataInputView.read(bArr);
            this.state = (HashMap) InstantiationUtil.deserializeObject(bArr, classLoader);
            this.windowBuffer = NonKeyedWindowOperator.this.windowBufferFactory.restoreFromSnapshot(dataInputView);
        }

        protected void writeToState(AbstractStateBackend.CheckpointStateOutputView checkpointStateOutputView) throws IOException {
            NonKeyedWindowOperator.this.windowSerializer.serialize(this.window, checkpointStateOutputView);
            checkpointStateOutputView.writeLong(this.watermarkTimer);
            checkpointStateOutputView.writeLong(this.processingTimeTimer);
            byte[] serializeObject = InstantiationUtil.serializeObject(this.state);
            checkpointStateOutputView.writeInt(serializeObject.length);
            checkpointStateOutputView.write(serializeObject, 0, serializeObject.length);
            this.windowBuffer.snapshot(checkpointStateOutputView);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, Class<S> cls, S s) {
            Objects.requireNonNull(cls, "The state type class must not be null");
            try {
                return getKeyValueState(str, (TypeInformation<TypeInformation<S>>) TypeExtractor.getForClass(cls), (TypeInformation<S>) s);
            } catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + cls.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, TypeInformation<S> typeInformation, S s) {
            Objects.requireNonNull(str, "The name of the state must not be null");
            Objects.requireNonNull(typeInformation, "The state type information must not be null");
            return getPartitionedState(new ValueStateDescriptor(str, typeInformation.createSerializer(NonKeyedWindowOperator.this.getExecutionConfig()), s));
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(final StateDescriptor<S, ?> stateDescriptor) {
            if (!(stateDescriptor instanceof ValueStateDescriptor)) {
                throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only support ValueState.");
            }
            final ValueStateDescriptor valueStateDescriptor = (ValueStateDescriptor) stateDescriptor;
            return new ValueState() { // from class: org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator.Context.1
                public Object value() throws IOException {
                    Object obj = Context.this.state.get(stateDescriptor.getName());
                    if (obj == null) {
                        obj = valueStateDescriptor.getDefaultValue();
                        Context.this.state.put(stateDescriptor.getName(), (Serializable) obj);
                    }
                    return obj;
                }

                public void update(Object obj) throws IOException {
                    if (!(obj instanceof Serializable)) {
                        throw new UnsupportedOperationException("Value state of NonKeyedWindowOperator must be serializable.");
                    }
                    Context.this.state.put(stateDescriptor.getName(), (Serializable) obj);
                }

                public void clear() {
                    Context.this.state.remove(stateDescriptor.getName());
                }
            };
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            if (this.processingTimeTimer == j) {
                return;
            }
            Set set = (Set) NonKeyedWindowOperator.this.processingTimeTimers.get(Long.valueOf(j));
            if (set == null) {
                NonKeyedWindowOperator.this.getRuntimeContext().registerTimer(j, NonKeyedWindowOperator.this);
                set = new HashSet();
                NonKeyedWindowOperator.this.processingTimeTimers.put(Long.valueOf(j), set);
            }
            this.processingTimeTimer = j;
            set.add(this);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            if (this.watermarkTimer == j) {
                return;
            }
            Set set = (Set) NonKeyedWindowOperator.this.watermarkTimers.get(Long.valueOf(j));
            if (set == null) {
                set = new HashSet();
                NonKeyedWindowOperator.this.watermarkTimers.put(Long.valueOf(j), set);
            }
            this.watermarkTimer = j;
            set.add(this);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            Set set = (Set) NonKeyedWindowOperator.this.processingTimeTimers.get(Long.valueOf(j));
            if (set != null) {
                set.remove(this);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            Set set = (Set) NonKeyedWindowOperator.this.watermarkTimers.get(Long.valueOf(j));
            if (set != null) {
                set.remove(this);
            }
        }

        public TriggerResult onElement(StreamRecord<IN> streamRecord) throws Exception {
            TriggerResult onElement = NonKeyedWindowOperator.this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), this.window, this);
            return (this.watermarkTimer <= 0 || this.watermarkTimer > NonKeyedWindowOperator.this.currentWatermark) ? onElement : TriggerResult.merge(onElement, onEventTime(this.watermarkTimer));
        }

        public TriggerResult onProcessingTime(long j) throws Exception {
            if (j != this.processingTimeTimer) {
                return TriggerResult.CONTINUE;
            }
            this.processingTimeTimer = -1L;
            return NonKeyedWindowOperator.this.trigger.onProcessingTime(j, this.window, this);
        }

        public TriggerResult onEventTime(long j) throws Exception {
            if (j != this.watermarkTimer) {
                return TriggerResult.CONTINUE;
            }
            this.watermarkTimer = -1L;
            TriggerResult onEventTime = NonKeyedWindowOperator.this.trigger.onEventTime(j, this.window, this);
            return (this.watermarkTimer <= 0 || this.watermarkTimer > NonKeyedWindowOperator.this.currentWatermark) ? onEventTime : TriggerResult.merge(onEventTime, onEventTime(this.watermarkTimer));
        }

        public void clear() throws Exception {
            NonKeyedWindowOperator.this.trigger.clear(this.window, this);
        }
    }

    public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory, AllWindowFunction<ACC, OUT, W> allWindowFunction, Trigger<? super IN, ? super W> trigger) {
        super(allWindowFunction);
        this.currentWatermark = -1L;
        this.windowAssigner = (WindowAssigner) Objects.requireNonNull(windowAssigner);
        this.windowSerializer = typeSerializer;
        this.windowBufferFactory = (WindowBufferFactory) Objects.requireNonNull(windowBufferFactory);
        this.trigger = (Trigger) Objects.requireNonNull(trigger);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        this.inputSerializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector<>(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashMap();
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashMap();
        }
        if (this.windows == null) {
            this.windows = new HashMap();
        }
        for (NonKeyedWindowOperator<IN, ACC, OUT, W>.Context context : this.windows.values()) {
            if (context.processingTimeTimer > 0) {
                Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context> set = this.processingTimeTimers.get(Long.valueOf(context.processingTimeTimer));
                if (set == null) {
                    getRuntimeContext().registerTimer(context.processingTimeTimer, this);
                    set = new HashSet();
                    this.processingTimeTimers.put(Long.valueOf(context.processingTimeTimer), set);
                }
                set.add(context);
            }
            if (context.watermarkTimer > 0) {
                Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context> set2 = this.watermarkTimers.get(Long.valueOf(context.watermarkTimer));
                if (set2 == null) {
                    set2 = new HashSet();
                    this.watermarkTimers.put(Long.valueOf(context.watermarkTimer), set2);
                }
                set2.add(context);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public final void dispose() {
        super.dispose();
        this.windows.clear();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public final void processElement(StreamRecord<IN> streamRecord) throws Exception {
        for (W w : this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp())) {
            NonKeyedWindowOperator<IN, ACC, OUT, W>.Context context = this.windows.get(w);
            if (context == null) {
                context = new Context(w, this.windowBufferFactory.create());
                this.windows.put(w, context);
            }
            context.windowBuffer.storeElement(streamRecord);
            processTriggerResult(context.onElement(streamRecord), w);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v1, types: [W extends org.apache.flink.streaming.api.windowing.windows.Window, org.apache.flink.streaming.api.windowing.windows.Window] */
    protected void emitWindow(NonKeyedWindowOperator<IN, ACC, OUT, W>.Context context) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
        if (context.windowBuffer.size() > 0) {
            ((AllWindowFunction) this.userFunction).apply(context.window, context.windowBuffer.getUnpackedElements(), this.timestampedCollector);
        }
    }

    private void processTriggerResult(TriggerResult triggerResult, W w) throws Exception {
        if (triggerResult.isFire() || triggerResult.isPurge()) {
            NonKeyedWindowOperator<IN, ACC, OUT, W>.Context remove = triggerResult.isPurge() ? this.windows.remove(w) : this.windows.get(w);
            if (remove == null) {
                LOG.debug("Window {} already gone.", w);
                return;
            }
            if (triggerResult.isFire()) {
                emitWindow(remove);
            }
            if (triggerResult.isPurge()) {
                remove.clear();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public final void processWatermark(Watermark watermark) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context>>> it = this.watermarkTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context>> next = it.next();
            if (next.getKey().longValue() <= watermark.getTimestamp()) {
                arrayList.add(next.getValue());
                it.remove();
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            for (Context context : (Set) it2.next()) {
                if (context.watermarkTimer <= watermark.getTimestamp()) {
                    processTriggerResult(context.onEventTime(context.watermarkTimer), context.window);
                }
            }
        }
        this.output.emitWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    @Override // org.apache.flink.streaming.runtime.operators.Triggerable
    public final void trigger(long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context>>> it = this.processingTimeTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context>> next = it.next();
            if (next.getKey().longValue() <= j) {
                arrayList.add(next.getValue());
                it.remove();
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            for (Context context : (Set) it2.next()) {
                if (context.processingTimeTimer <= j) {
                    processTriggerResult(context.onProcessingTime(context.processingTimeTimer), context.window);
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        AbstractStateBackend.CheckpointStateOutputView createCheckpointStateOutputView = getStateBackend().createCheckpointStateOutputView(j, j2);
        createCheckpointStateOutputView.writeInt(this.windows.size());
        Iterator<NonKeyedWindowOperator<IN, ACC, OUT, W>.Context> it = this.windows.values().iterator();
        while (it.hasNext()) {
            it.next().writeToState(createCheckpointStateOutputView);
        }
        snapshotOperatorState.setOperatorState(createCheckpointStateOutputView.closeAndGetHandle());
        return snapshotOperatorState;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState, long j) throws Exception {
        super.restoreState(streamTaskState, j);
        ClassLoader userCodeClassloader = getUserCodeClassloader();
        DataInputView dataInputView = (DataInputView) streamTaskState.getOperatorState().getState(userCodeClassloader);
        int readInt = dataInputView.readInt();
        this.windows = new HashMap(readInt);
        this.processingTimeTimers = new HashMap();
        this.watermarkTimers = new HashMap();
        for (int i = 0; i < readInt; i++) {
            NonKeyedWindowOperator<IN, ACC, OUT, W>.Context context = new Context(dataInputView, userCodeClassloader);
            this.windows.put(context.window, context);
        }
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() {
        return this.windowBufferFactory;
    }
}
