/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Objects;
import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.AbstractKeyedTimePanes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

@Internal
public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function>
extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>,
Triggerable {
    private static final long serialVersionUID = 3245500864882459867L;
    private static final long MIN_SLIDE_TIME = 50L;
    private final Function function;
    private final KeySelector<IN, KEY> keySelector;
    private final TypeSerializer<KEY> keySerializer;
    private final TypeSerializer<STATE> stateTypeSerializer;
    private final long windowSize;
    private final long windowSlide;
    private final long paneSize;
    private final int numPanesPerWindow;
    private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
    private transient TimestampedCollector<OUT> out;
    private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
    private transient long nextEvaluationTime;
    private transient long nextSlideTime;

    protected AbstractAlignedProcessingTimeWindowOperator(F function, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, TypeSerializer<STATE> stateTypeSerializer, long windowLength, long windowSlide) {
        super(function);
        if (windowLength < 50L) {
            throw new IllegalArgumentException("Window length must be at least 50 msecs");
        }
        if (windowSlide < 50L) {
            throw new IllegalArgumentException("Window slide must be at least 50 msecs");
        }
        if (windowLength < windowSlide) {
            throw new IllegalArgumentException("The window size must be larger than the window slide");
        }
        long paneSlide = ArithmeticUtils.gcd((long)windowLength, (long)windowSlide);
        if (paneSlide < 50L) {
            throw new IllegalArgumentException(String.format("Cannot compute window of size %d msecs sliding by %d msecs. The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
        }
        this.function = (Function)Objects.requireNonNull(function);
        this.keySelector = Objects.requireNonNull(keySelector);
        this.keySerializer = Objects.requireNonNull(keySerializer);
        this.stateTypeSerializer = Objects.requireNonNull(stateTypeSerializer);
        this.windowSize = windowLength;
        this.windowSlide = windowSlide;
        this.paneSize = paneSlide;
        this.numPanesPerWindow = MathUtils.checkedDownCast((long)(windowLength / paneSlide));
    }

    protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(KeySelector<IN, KEY> var1, Function var2);

    @Override
    public void open() throws Exception {
        super.open();
        this.out = new TimestampedCollector(this.output);
        long now = System.currentTimeMillis();
        this.nextEvaluationTime = now + this.windowSlide - now % this.windowSlide;
        this.nextSlideTime = now + this.paneSize - now % this.paneSize;
        long firstTriggerTime = Math.min(this.nextEvaluationTime, this.nextSlideTime);
        if (this.restoredState == null) {
            this.panes = this.createPanes(this.keySelector, this.function);
        } else {
            this.panes = this.restoredState.panes;
            long nextPastEvaluationTime = this.restoredState.nextEvaluationTime;
            long nextPastSlideTime = this.restoredState.nextSlideTime;
            long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
            int numPanesRestored = this.panes.getNumPanes();
            while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) {
                if (nextPastTriggerTime == nextPastEvaluationTime) {
                    this.computeWindow(nextPastTriggerTime);
                    nextPastEvaluationTime += this.windowSlide;
                }
                if (nextPastTriggerTime == nextPastSlideTime) {
                    this.panes.slidePanes(this.numPanesPerWindow);
                    --numPanesRestored;
                    nextPastSlideTime += this.paneSize;
                }
                nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
            }
        }
        this.registerTimer(firstTriggerTime, this);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.stopTriggers();
    }

    @Override
    public void dispose() {
        super.dispose();
        this.stopTriggers();
        if (this.panes != null) {
            this.panes.dispose();
        }
    }

    private void stopTriggers() {
        this.nextEvaluationTime = -1L;
        this.nextSlideTime = -1L;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        this.panes.addElementToLatestPane(element.getValue());
    }

    @Override
    public void processWatermark(Watermark mark) {
    }

    @Override
    public void trigger(long timestamp) throws Exception {
        if (timestamp == this.nextEvaluationTime) {
            this.computeWindow(timestamp);
            this.nextEvaluationTime += this.windowSlide;
        }
        if (timestamp == this.nextSlideTime) {
            this.panes.slidePanes(this.numPanesPerWindow);
            this.nextSlideTime += this.paneSize;
        }
        long nextTriggerTime = Math.min(this.nextEvaluationTime, this.nextSlideTime);
        this.registerTimer(nextTriggerTime, this);
    }

    private void computeWindow(long timestamp) throws Exception {
        this.out.setAbsoluteTimestamp(timestamp);
        this.panes.truncatePanes(this.numPanesPerWindow);
        this.panes.evaluateWindow(this.out, new TimeWindow(timestamp, timestamp + this.windowSize), this);
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        AbstractStateBackend.CheckpointStateOutputView out = this.getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
        out.writeLong(this.nextEvaluationTime);
        out.writeLong(this.nextSlideTime);
        this.panes.writeToOutput((DataOutputView)out, this.keySerializer, this.stateTypeSerializer);
        taskState.setOperatorState(out.closeAndGetHandle());
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
        super.restoreState(taskState, recoveryTimestamp);
        StateHandle<?> inputState = taskState.getOperatorState();
        DataInputView in = (DataInputView)inputState.getState(this.getUserCodeClassloader());
        long nextEvaluationTime = in.readLong();
        long nextSlideTime = in.readLong();
        AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = this.createPanes(this.keySelector, this.function);
        panes.readFromInput(in, this.keySerializer, this.stateTypeSerializer);
        this.restoredState = new RestoredState<IN, KEY, STATE, OUT>(panes, nextEvaluationTime, nextSlideTime);
    }

    public long getWindowSize() {
        return this.windowSize;
    }

    public long getWindowSlide() {
        return this.windowSlide;
    }

    public long getPaneSize() {
        return this.paneSize;
    }

    public int getNumPanesPerWindow() {
        return this.numPanesPerWindow;
    }

    public long getNextEvaluationTime() {
        return this.nextEvaluationTime;
    }

    public long getNextSlideTime() {
        return this.nextSlideTime;
    }

    public String toString() {
        return "Window (processing time) (length=" + this.windowSize + ", slide=" + this.windowSlide + ')';
    }

    private static final class RestoredState<IN, KEY, STATE, OUT> {
        final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
        final long nextEvaluationTime;
        final long nextSlideTime;

        RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes, long nextEvaluationTime, long nextSlideTime) {
            this.panes = panes;
            this.nextEvaluationTime = nextEvaluationTime;
            this.nextSlideTime = nextSlideTime;
        }
    }
}

