package org.apache.flink.streaming.api.invokable.operator;

import java.io.Serializable;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.CircularFifoList;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.class */
public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
    private static final long serialVersionUID = 1;
    protected GroupReduceFunction<IN, OUT> reducer;
    protected long slideSize;
    protected long batchSize;
    protected int granularity;
    protected int batchPerSlide;
    protected BatchGroupReduceInvokable<IN, OUT>.StreamBatch batch;
    protected BatchGroupReduceInvokable<IN, OUT>.StreamBatch currentBatch;
    protected long numberOfBatches;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable$StreamBatch.class */
    public class StreamBatch implements Serializable {
        private static final long serialVersionUID = 1;
        protected CircularFifoList<IN> circularList = new CircularFifoList<>();
        private long counter = 0;
        protected long minibatchCounter = 0;

        public StreamBatch() {
        }

        public void addToBuffer(IN in) throws Exception {
            this.circularList.add(in);
            this.counter += serialVersionUID;
            if (miniBatchEnd()) {
                this.circularList.newSlide();
                this.minibatchCounter += serialVersionUID;
                if (batchEnd()) {
                    reduceBatch();
                    this.circularList.shiftWindow(BatchGroupReduceInvokable.this.batchPerSlide);
                }
            }
        }

        protected boolean miniBatchEnd() {
            if (this.counter % BatchGroupReduceInvokable.this.granularity != 0) {
                return false;
            }
            this.counter = 0L;
            return true;
        }

        public boolean batchEnd() {
            if (this.minibatchCounter != BatchGroupReduceInvokable.this.numberOfBatches) {
                return false;
            }
            this.minibatchCounter -= BatchGroupReduceInvokable.this.batchPerSlide;
            return true;
        }

        public void reduceBatch() {
            BatchGroupReduceInvokable.this.reduce(this);
        }

        public void reduceLastBatch() {
            if (miniBatchEnd()) {
                return;
            }
            reduceBatch();
        }

        public String toString() {
            return this.circularList.toString();
        }
    }

    public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> groupReduceFunction, long j, long j2) {
        super(groupReduceFunction);
        this.reducer = groupReduceFunction;
        this.batchSize = j;
        this.slideSize = j2;
        this.granularity = (int) MathUtils.gcd(j, j2);
        this.batchPerSlide = (int) (j2 / this.granularity);
        this.numberOfBatches = j / this.granularity;
        this.batch = new StreamBatch();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void immutableInvoke() throws Exception {
        StreamRecord<IN> streamRecord = (StreamRecord) this.recordIterator.next(this.reuse);
        this.reuse = streamRecord;
        if (streamRecord == null) {
            throw new RuntimeException("DataStream must not be empty");
        }
        while (this.reuse != null) {
            getBatch(this.reuse).addToBuffer(this.reuse.getObject());
            resetReuse();
            this.reuse = (StreamRecord) this.recordIterator.next(this.reuse);
        }
        reduceLastBatch();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void mutableInvoke() throws Exception {
        System.out.println("Immutable setting is used");
        immutableInvoke();
    }

    protected BatchGroupReduceInvokable<IN, OUT>.StreamBatch getBatch(StreamRecord<IN> streamRecord) {
        return this.batch;
    }

    protected void reduce(BatchGroupReduceInvokable<IN, OUT>.StreamBatch streamBatch) {
        this.currentBatch = streamBatch;
        callUserFunctionAndLogException();
    }

    protected void reduceLastBatch() {
        this.batch.reduceLastBatch();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void callUserFunction() throws Exception {
        if (this.currentBatch.circularList.isEmpty()) {
            return;
        }
        this.reducer.reduce(this.currentBatch.circularList.getIterable(), this.collector);
    }
}
