/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable,
InputTypeConfigurable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
    private final WindowAssigner<? super IN, W> windowAssigner;
    private final Trigger<? super IN, ? super W> trigger;
    private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory;
    private TypeSerializer<IN> inputSerializer;
    private final TypeSerializer<W> windowSerializer;
    private transient Map<Long, Set<Context>> processingTimeTimers;
    private transient Map<Long, Set<Context>> watermarkTimers;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient long currentWatermark = -1L;
    protected transient Map<W, Context> windows;

    public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory, AllWindowFunction<ACC, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger) {
        super(windowFunction);
        this.windowAssigner = Objects.requireNonNull(windowAssigner);
        this.windowSerializer = windowSerializer;
        this.windowBufferFactory = Objects.requireNonNull(windowBufferFactory);
        this.trigger = Objects.requireNonNull(trigger);
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
        this.inputSerializer = type.createSerializer(executionConfig);
    }

    @Override
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashMap<Long, Set<Context>>();
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashMap<Long, Set<Context>>();
        }
        if (this.windows == null) {
            this.windows = new HashMap<W, Context>();
        }
        for (Context context : this.windows.values()) {
            Set<Context> triggers;
            if (context.processingTimeTimer > 0L) {
                triggers = this.processingTimeTimers.get(context.processingTimeTimer);
                if (triggers == null) {
                    this.getRuntimeContext().registerTimer(context.processingTimeTimer, this);
                    triggers = new HashSet<Context>();
                    this.processingTimeTimers.put(context.processingTimeTimer, triggers);
                }
                triggers.add(context);
            }
            if (context.watermarkTimer <= 0L) continue;
            triggers = this.watermarkTimers.get(context.watermarkTimer);
            if (triggers == null) {
                triggers = new HashSet<Context>();
                this.watermarkTimers.put(context.watermarkTimer, triggers);
            }
            triggers.add(context);
        }
    }

    @Override
    public final void dispose() {
        super.dispose();
        this.windows.clear();
    }

    @Override
    public final void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
        for (Window window : elementWindows) {
            Context context = this.windows.get(window);
            if (context == null) {
                WindowBuffer<IN, ACC> windowBuffer = this.windowBufferFactory.create();
                context = new Context(this, window, windowBuffer);
                this.windows.put(window, context);
            }
            context.windowBuffer.storeElement(element);
            TriggerResult triggerResult = context.onElement(element);
            this.processTriggerResult(triggerResult, window);
        }
    }

    protected void emitWindow(Context context) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)context.window).maxTimestamp());
        if (context.windowBuffer.size() > 0) {
            ((AllWindowFunction)this.userFunction).apply(context.window, context.windowBuffer.getUnpackedElements(), this.timestampedCollector);
        }
    }

    private void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
        if (!triggerResult.isFire() && !triggerResult.isPurge()) {
            return;
        }
        Context context = triggerResult.isPurge() ? this.windows.remove(window) : this.windows.get(window);
        if (context == null) {
            LOG.debug("Window {} already gone.", window);
            return;
        }
        if (triggerResult.isFire()) {
            this.emitWindow(context);
        }
        if (triggerResult.isPurge()) {
            context.clear();
        }
    }

    @Override
    public final void processWatermark(Watermark mark) throws Exception {
        ArrayList<Set<Context>> toTrigger = new ArrayList<Set<Context>>();
        Iterator<Map.Entry<Long, Set<Context>>> it = this.watermarkTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<Context>> triggers = it.next();
            if (triggers.getKey() > mark.getTimestamp()) continue;
            toTrigger.add(triggers.getValue());
            it.remove();
        }
        for (Set set : toTrigger) {
            for (Context ctx : set) {
                if (ctx.watermarkTimer > mark.getTimestamp()) continue;
                TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
                this.processTriggerResult(triggerResult, ctx.window);
            }
        }
        this.output.emitWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    @Override
    public final void trigger(long time) throws Exception {
        ArrayList<Set<Context>> toTrigger = new ArrayList<Set<Context>>();
        Iterator<Map.Entry<Long, Set<Context>>> it = this.processingTimeTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<Context>> triggers = it.next();
            if (triggers.getKey() > time) continue;
            toTrigger.add(triggers.getValue());
            it.remove();
        }
        for (Set set : toTrigger) {
            for (Context ctx : set) {
                if (ctx.processingTimeTimer > time) continue;
                TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
                this.processTriggerResult(triggerResult, ctx.window);
            }
        }
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        AbstractStateBackend.CheckpointStateOutputView out = this.getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
        int numWindows = this.windows.size();
        out.writeInt(numWindows);
        for (Context context : this.windows.values()) {
            context.writeToState(out);
        }
        taskState.setOperatorState(out.closeAndGetHandle());
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
        super.restoreState(taskState, recoveryTimestamp);
        ClassLoader userClassloader = this.getUserCodeClassloader();
        StateHandle<?> inputState = taskState.getOperatorState();
        DataInputView in = (DataInputView)inputState.getState(userClassloader);
        int numWindows = in.readInt();
        this.windows = new HashMap<W, Context>(numWindows);
        this.processingTimeTimers = new HashMap<Long, Set<Context>>();
        this.watermarkTimers = new HashMap<Long, Set<Context>>();
        for (int j = 0; j < numWindows; ++j) {
            Context context = new Context(this, in, userClassloader);
            this.windows.put(context.window, context);
        }
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() {
        return this.windowBufferFactory;
    }

    protected static class Context
    implements Trigger.TriggerContext {
        protected W window;
        protected WindowBuffer<IN, ACC> windowBuffer;
        protected HashMap<String, Serializable> state;
        protected long watermarkTimer;
        protected long processingTimeTimer;
        final /* synthetic */ NonKeyedWindowOperator this$0;

        public Context(W window, WindowBuffer<IN, ACC> windowBuffer) {
            this.this$0 = this$0;
            this.window = window;
            this.windowBuffer = windowBuffer;
            this.state = new HashMap();
            this.watermarkTimer = -1L;
            this.processingTimeTimer = -1L;
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.currentWatermark;
        }

        protected Context(NonKeyedWindowOperator this$0, DataInputView in, ClassLoader userClassloader) throws Exception {
            this.this$0 = this$0;
            this.window = (Window)this$0.windowSerializer.deserialize(in);
            this.watermarkTimer = in.readLong();
            this.processingTimeTimer = in.readLong();
            int stateSize = in.readInt();
            byte[] stateData = new byte[stateSize];
            in.read(stateData);
            this.state = (HashMap)InstantiationUtil.deserializeObject((byte[])stateData, (ClassLoader)userClassloader);
            this.windowBuffer = this$0.windowBufferFactory.restoreFromSnapshot(in);
        }

        protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException {
            this.this$0.windowSerializer.serialize(this.window, (DataOutputView)out);
            out.writeLong(this.watermarkTimer);
            out.writeLong(this.processingTimeTimer);
            byte[] serializedState = InstantiationUtil.serializeObject(this.state);
            out.writeInt(serializedState.length);
            out.write(serializedState, 0, serializedState.length);
            this.windowBuffer.snapshot((DataOutputView)out);
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
            TypeInformation typeInfo;
            Objects.requireNonNull(stateType, "The state type class must not be null");
            try {
                typeInfo = TypeExtractor.getForClass(stateType);
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + stateType.getName() + "' from the class alone, due to generic type parameters. " + "Please specify the TypeInformation directly.", e);
            }
            return this.getKeyValueState(name, typeInfo, defaultState);
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
            Objects.requireNonNull(name, "The name of the state must not be null");
            Objects.requireNonNull(stateType, "The state type information must not be null");
            ValueStateDescriptor stateDesc = new ValueStateDescriptor(name, stateType.createSerializer(this.this$0.getExecutionConfig()), defaultState);
            return (ValueState)this.getPartitionedState((StateDescriptor<S, ?>)stateDesc);
        }

        @Override
        public <S extends State> S getPartitionedState(final StateDescriptor<S, ?> stateDescriptor) {
            if (!(stateDescriptor instanceof ValueStateDescriptor)) {
                throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only support ValueState.");
            }
            final ValueStateDescriptor valueStateDescriptor = (ValueStateDescriptor)stateDescriptor;
            ValueState valueState = new ValueState(){

                public Object value() throws IOException {
                    Object value = Context.this.state.get(stateDescriptor.getName());
                    if (value == null) {
                        value = valueStateDescriptor.getDefaultValue();
                        Context.this.state.put(stateDescriptor.getName(), (Serializable)value);
                    }
                    return value;
                }

                public void update(Object value) throws IOException {
                    if (!(value instanceof Serializable)) {
                        throw new UnsupportedOperationException("Value state of NonKeyedWindowOperator must be serializable.");
                    }
                    Context.this.state.put(stateDescriptor.getName(), (Serializable)value);
                }

                public void clear() {
                    Context.this.state.remove(stateDescriptor.getName());
                }
            };
            return (S)valueState;
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            if (this.processingTimeTimer == time) {
                return;
            }
            HashSet<Context> triggers = (HashSet<Context>)this.this$0.processingTimeTimers.get(time);
            if (triggers == null) {
                this.this$0.getRuntimeContext().registerTimer(time, this.this$0);
                triggers = new HashSet<Context>();
                this.this$0.processingTimeTimers.put(time, triggers);
            }
            this.processingTimeTimer = time;
            triggers.add(this);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            if (this.watermarkTimer == time) {
                return;
            }
            HashSet<Context> triggers = (HashSet<Context>)this.this$0.watermarkTimers.get(time);
            if (triggers == null) {
                triggers = new HashSet<Context>();
                this.this$0.watermarkTimers.put(time, triggers);
            }
            this.watermarkTimer = time;
            triggers.add(this);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            Set triggers = (Set)this.this$0.processingTimeTimers.get(time);
            if (triggers != null) {
                triggers.remove(this);
            }
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            Set triggers = (Set)this.this$0.watermarkTimers.get(time);
            if (triggers != null) {
                triggers.remove(this);
            }
        }

        public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
            TriggerResult onElementResult = this.this$0.trigger.onElement(element.getValue(), element.getTimestamp(), this.window, this);
            if (this.watermarkTimer > 0L && this.watermarkTimer <= this.this$0.currentWatermark) {
                TriggerResult onEventTimeResult = this.onEventTime(this.watermarkTimer);
                return TriggerResult.merge(onElementResult, onEventTimeResult);
            }
            return onElementResult;
        }

        public TriggerResult onProcessingTime(long time) throws Exception {
            if (time == this.processingTimeTimer) {
                this.processingTimeTimer = -1L;
                return this.this$0.trigger.onProcessingTime(time, this.window, this);
            }
            return TriggerResult.CONTINUE;
        }

        public TriggerResult onEventTime(long time) throws Exception {
            if (time == this.watermarkTimer) {
                this.watermarkTimer = -1L;
                TriggerResult firstTriggerResult = this.this$0.trigger.onEventTime(time, this.window, this);
                if (this.watermarkTimer > 0L && this.watermarkTimer <= this.this$0.currentWatermark) {
                    TriggerResult secondTriggerResult = this.onEventTime(this.watermarkTimer);
                    return TriggerResult.merge(firstTriggerResult, secondTriggerResult);
                }
                return firstTriggerResult;
            }
            return TriggerResult.CONTINUE;
        }

        public void clear() throws Exception {
            this.this$0.trigger.clear(this.window, this);
        }
    }
}

