package org.apache.flink.streaming.api.invokable.operator;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.class */
public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
    private static final long serialVersionUID = 1;
    protected long startTime;
    protected long nextRecordTime;
    protected TimeStamp<OUT> timestamp;
    protected WindowReduceInvokable<OUT>.StreamWindow window;

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable$StreamWindow.class */
    protected class StreamWindow extends BatchReduceInvokable<OUT>.StreamBatch {
        private static final long serialVersionUID = 1;

        public StreamWindow() {
            super(WindowReduceInvokable.this);
        }

        @Override // org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable.StreamBatch
        public void reduceToBuffer(OUT out) throws Exception {
            checkWindowEnd(WindowReduceInvokable.this.timestamp.getTimestamp(out));
            if (this.currentValue != null) {
                this.currentValue = (OUT) WindowReduceInvokable.this.reducer.reduce(WindowReduceInvokable.this.serializer.copy(this.currentValue), WindowReduceInvokable.this.serializer.copy(out));
            } else {
                this.currentValue = out;
            }
        }

        protected synchronized void checkWindowEnd(long j) {
            WindowReduceInvokable.this.nextRecordTime = j;
            while (miniBatchEnd()) {
                addToBuffer();
                if (batchEnd()) {
                    reduceBatch();
                }
            }
        }

        @Override // org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable.StreamBatch
        public void reduceBatch() {
            WindowReduceInvokable.this.reduce(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable.StreamBatch
        public boolean miniBatchEnd() {
            if (WindowReduceInvokable.this.nextRecordTime < WindowReduceInvokable.this.startTime + WindowReduceInvokable.this.granularity) {
                return false;
            }
            WindowReduceInvokable.this.startTime += WindowReduceInvokable.this.granularity;
            return true;
        }

        @Override // org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable.StreamBatch
        public boolean batchEnd() {
            if (this.minibatchCounter != WindowReduceInvokable.this.numberOfBatches) {
                return false;
            }
            this.minibatchCounter -= WindowReduceInvokable.this.batchPerSlide;
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable$TimeCheck.class */
    private class TimeCheck extends Thread {
        private TimeCheck() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(WindowReduceInvokable.this.slideSize);
                } catch (InterruptedException e) {
                }
                if (!WindowReduceInvokable.this.isRunning) {
                    return;
                } else {
                    WindowReduceInvokable.this.window.checkWindowEnd(System.currentTimeMillis());
                }
            }
        }
    }

    public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long j, long j2, TimeStamp<OUT> timeStamp) {
        super(reduceFunction, j, j2);
        this.timestamp = timeStamp;
        this.startTime = timeStamp.getStartTime();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable, org.apache.flink.streaming.api.invokable.StreamInvokable
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.window = new StreamWindow();
        this.batch = this.window;
        if (this.timestamp instanceof DefaultTimeStamp) {
            new TimeCheck().start();
        }
    }
}
