package org.apache.flink.streaming.api.invokable.operator;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.class */
public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
    private static final long serialVersionUID = 1;
    protected ReduceFunction<OUT> reducer;
    protected long slideSize;
    protected long batchSize;
    protected int granularity;
    protected long batchPerSlide;
    protected long numberOfBatches;
    protected BatchReduceInvokable<OUT>.StreamBatch batch;
    protected BatchReduceInvokable<OUT>.StreamBatch currentBatch;
    protected TypeSerializer<OUT> serializer;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable$StreamBatch.class */
    public class StreamBatch implements Serializable {
        private static final long serialVersionUID = 1;
        protected OUT currentValue;
        protected NullableCircularBuffer circularBuffer;
        protected long counter = 0;
        protected long minibatchCounter = 0;
        boolean changed = false;

        public StreamBatch() {
            this.circularBuffer = new NullableCircularBuffer((int) (BatchReduceInvokable.this.batchSize / BatchReduceInvokable.this.granularity));
        }

        public void reduceToBuffer(OUT out) throws Exception {
            if (this.currentValue != null) {
                this.currentValue = (OUT) BatchReduceInvokable.this.reducer.reduce(BatchReduceInvokable.this.serializer.copy(this.currentValue), BatchReduceInvokable.this.serializer.copy(out));
            } else {
                this.currentValue = out;
            }
            this.counter += serialVersionUID;
            if (miniBatchEnd()) {
                addToBuffer();
                if (batchEnd()) {
                    reduceBatch();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void addToBuffer() {
            this.circularBuffer.add(this.currentValue);
            this.changed = true;
            this.minibatchCounter += serialVersionUID;
            this.currentValue = null;
        }

        protected boolean miniBatchEnd() {
            if (this.counter % BatchReduceInvokable.this.granularity != 0) {
                return false;
            }
            this.counter = 0L;
            return true;
        }

        public boolean batchEnd() {
            if (this.minibatchCounter != BatchReduceInvokable.this.numberOfBatches) {
                return false;
            }
            this.minibatchCounter -= BatchReduceInvokable.this.batchPerSlide;
            return true;
        }

        public void reduceLastBatch() throws Exception {
            if (miniBatchInProgress()) {
                addToBuffer();
            }
            if (!this.changed || this.minibatchCounter < 0) {
                return;
            }
            if (this.circularBuffer.isFull()) {
                long j = 0;
                while (true) {
                    long j2 = j;
                    if (j2 >= BatchReduceInvokable.this.numberOfBatches - this.minibatchCounter) {
                        break;
                    }
                    if (!this.circularBuffer.isEmpty()) {
                        this.circularBuffer.remove();
                    }
                    j = j2 + serialVersionUID;
                }
            }
            if (this.circularBuffer.isEmpty()) {
                return;
            }
            BatchReduceInvokable.this.reduce(this);
        }

        public boolean miniBatchInProgress() {
            return this.currentValue != null;
        }

        public void reduceBatch() {
            BatchReduceInvokable.this.reduce(this);
            this.changed = false;
        }

        public Iterator<OUT> getIterator() {
            return this.circularBuffer.iterator();
        }

        public String toString() {
            return this.circularBuffer.toString();
        }
    }

    public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long j, long j2) {
        super(reduceFunction);
        this.reducer = reduceFunction;
        this.batchSize = j;
        this.slideSize = j2;
        this.granularity = (int) MathUtils.gcd(j, j2);
        this.batchPerSlide = j2 / this.granularity;
        this.numberOfBatches = j / this.granularity;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void immutableInvoke() throws Exception {
        StreamRecord<IN> streamRecord = (StreamRecord) this.recordIterator.next(this.reuse);
        this.reuse = streamRecord;
        if (streamRecord == 0) {
            throw new RuntimeException("DataStream must not be empty");
        }
        while (this.reuse != null) {
            getBatch(this.reuse).reduceToBuffer(this.reuse.getObject());
            resetReuse();
            this.reuse = (StreamRecord) this.recordIterator.next(this.reuse);
        }
        reduceLastBatch();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void mutableInvoke() throws Exception {
        System.out.println("Immutable setting is used");
        immutableInvoke();
    }

    protected BatchReduceInvokable<OUT>.StreamBatch getBatch(StreamRecord<OUT> streamRecord) {
        return this.batch;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reduce(BatchReduceInvokable<OUT>.StreamBatch streamBatch) {
        this.currentBatch = streamBatch;
        callUserFunctionAndLogException();
    }

    protected void reduceLastBatch() throws Exception {
        this.batch.reduceLastBatch();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void callUserFunction() throws Exception {
        Object obj;
        Iterator<OUT> iterator = this.currentBatch.getIterator();
        Object obj2 = null;
        while (true) {
            obj = obj2;
            if (!iterator.hasNext() || obj != null) {
                break;
            } else {
                obj2 = iterator.next();
            }
        }
        while (iterator.hasNext()) {
            OUT next = iterator.next();
            if (next != null) {
                obj = this.reducer.reduce(this.serializer.copy(obj), this.serializer.copy(next));
            }
        }
        if (obj != null) {
            this.collector.collect(obj);
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.serializer = this.inSerializer.getObjectSerializer();
        this.batch = new StreamBatch();
    }
}
