package org.apache.flink.streaming.api.invokable.operator.co;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.class */
public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1;
    protected CoReduceFunction<IN1, IN2, OUT> coReducer;
    protected long slideSize1;
    protected long slideSize2;
    protected long batchSize1;
    protected long batchSize2;
    protected int granularity1;
    protected int granularity2;
    protected long batchPerSlide1;
    protected long batchPerSlide2;
    protected long numberOfBatches1;
    protected long numberOfBatches2;
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> batch1;
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> batch2;
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> currentBatch1;
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> currentBatch2;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable$StreamBatch.class */
    public class StreamBatch<IN> implements Serializable {
        private static final long serialVersionUID = 1;
        protected long batchSize;
        protected long slideSize;
        protected long granularity;
        protected long batchPerSlide;
        protected long numberOfBatches;
        protected NullableCircularBuffer circularBuffer;
        protected long counter = 0;
        protected long minibatchCounter = 0;
        protected IN currentValue = null;
        boolean changed = false;

        public StreamBatch(long j, long j2) {
            this.batchSize = j;
            this.slideSize = j2;
            this.granularity = (int) MathUtils.gcd(j, j2);
            this.batchPerSlide = j2 / this.granularity;
            this.circularBuffer = new NullableCircularBuffer((int) (j / this.granularity));
            this.numberOfBatches = j / this.granularity;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void addToBuffer() {
            this.circularBuffer.add(this.currentValue);
            this.changed = true;
            this.minibatchCounter += serialVersionUID;
            this.currentValue = null;
        }

        protected boolean miniBatchEnd() {
            return this.counter % this.granularity == 0;
        }

        public boolean batchEnd() {
            if (this.counter != this.batchSize) {
                return false;
            }
            this.counter -= this.slideSize;
            this.minibatchCounter -= this.batchPerSlide;
            return true;
        }

        public boolean miniBatchInProgress() {
            return this.currentValue != null;
        }

        public Iterator<IN> getIterator() {
            return this.circularBuffer.iterator();
        }
    }

    public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReduceFunction, long j, long j2, long j3, long j4) {
        super(coReduceFunction);
        this.coReducer = coReduceFunction;
        this.batchSize1 = j;
        this.batchSize2 = j2;
        this.slideSize1 = j3;
        this.slideSize2 = j4;
        this.granularity1 = (int) MathUtils.gcd(j, j3);
        this.granularity2 = (int) MathUtils.gcd(j2, j4);
        this.batchPerSlide1 = j3 / this.granularity1;
        this.batchPerSlide2 = j4 / this.granularity2;
        this.numberOfBatches1 = j / this.granularity1;
        this.numberOfBatches2 = j2 / this.granularity2;
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable, org.apache.flink.streaming.api.invokable.StreamInvokable
    public void immutableInvoke() throws Exception {
        while (true) {
            int next = this.recordIterator.next(this.reuse1, this.reuse2);
            if (next == 0) {
                reduceLastBatch1();
                reduceLastBatch2();
                return;
            } else if (next == 1) {
                handleStream1();
                resetReuse1();
            } else {
                handleStream2();
                resetReuse2();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void handleStream1() throws Exception {
        reduceToBuffer1(this.reuse1.getObject(), getBatch1(this.reuse1));
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void handleStream2() throws Exception {
        reduceToBuffer2(this.reuse2.getObject(), getBatch2(this.reuse2));
    }

    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> getBatch1(StreamRecord<IN1> streamRecord) {
        return this.batch1;
    }

    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> getBatch2(StreamRecord<IN2> streamRecord) {
        return this.batch2;
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable, org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void mutableInvoke() throws Exception {
        System.out.println("Immutable setting is used");
        immutableInvoke();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reduce1(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch) {
        this.currentBatch1 = streamBatch;
        callUserFunctionAndLogException1();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reduce2(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch) {
        this.currentBatch2 = streamBatch;
        callUserFunctionAndLogException2();
    }

    protected void reduceLastBatch1() throws Exception {
        reduceLastBatch1(this.batch1);
    }

    protected void reduceLastBatch2() throws Exception {
        reduceLastBatch2(this.batch2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void callUserFunction1() throws Exception {
        Object obj;
        Iterator<IN1> iterator = this.currentBatch1.getIterator();
        Object obj2 = null;
        while (true) {
            obj = obj2;
            if (!iterator.hasNext() || obj != null) {
                break;
            } else {
                obj2 = iterator.next();
            }
        }
        while (iterator.hasNext()) {
            IN1 next = iterator.next();
            if (next != null) {
                obj = this.coReducer.reduce1(this.serializer1.copy(obj), this.serializer1.copy(next));
            }
        }
        if (obj != null) {
            this.collector.collect(this.coReducer.map1(this.serializer1.copy(obj)));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void callUserFunction2() throws Exception {
        Object obj;
        Iterator<IN2> iterator = this.currentBatch2.getIterator();
        Object obj2 = null;
        while (true) {
            obj = obj2;
            if (!iterator.hasNext() || obj != null) {
                break;
            } else {
                obj2 = iterator.next();
            }
        }
        while (iterator.hasNext()) {
            IN2 next = iterator.next();
            if (next != null) {
                obj = this.coReducer.reduce2(this.serializer2.copy(obj), this.serializer2.copy(next));
            }
        }
        if (obj != null) {
            this.collector.collect(this.coReducer.map2(this.serializer2.copy(obj)));
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.batch1 = new StreamBatch<>(this.batchSize1, this.slideSize1);
        this.batch2 = new StreamBatch<>(this.batchSize2, this.slideSize2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v7, types: [IN, java.lang.Object] */
    public void reduceToBuffer1(IN1 in1, CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch) throws Exception {
        if (streamBatch.currentValue != null) {
            streamBatch.currentValue = this.coReducer.reduce1(this.serializer1.copy(streamBatch.currentValue), this.serializer1.copy(in1));
        } else {
            streamBatch.currentValue = in1;
        }
        streamBatch.counter += serialVersionUID;
        if (streamBatch.miniBatchEnd()) {
            streamBatch.addToBuffer();
            if (streamBatch.batchEnd()) {
                reduceBatch1(streamBatch);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v7, types: [IN, java.lang.Object] */
    public void reduceToBuffer2(IN2 in2, CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch) throws Exception {
        if (streamBatch.currentValue != null) {
            streamBatch.currentValue = this.coReducer.reduce2(this.serializer2.copy(streamBatch.currentValue), this.serializer2.copy(in2));
        } else {
            streamBatch.currentValue = in2;
        }
        streamBatch.counter += serialVersionUID;
        if (streamBatch.miniBatchEnd()) {
            streamBatch.addToBuffer();
            if (streamBatch.batchEnd()) {
                reduceBatch2(streamBatch);
            }
        }
    }

    public void reduceLastBatch1(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch) throws Exception {
        if (streamBatch.miniBatchInProgress()) {
            streamBatch.addToBuffer();
        }
        if (!streamBatch.changed || streamBatch.minibatchCounter < 0) {
            return;
        }
        if (streamBatch.circularBuffer.isFull()) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= this.numberOfBatches1 - streamBatch.minibatchCounter) {
                    break;
                }
                if (!streamBatch.circularBuffer.isEmpty()) {
                    streamBatch.circularBuffer.remove();
                }
                j = j2 + serialVersionUID;
            }
        }
        if (streamBatch.circularBuffer.isEmpty()) {
            return;
        }
        reduce1(streamBatch);
    }

    public void reduceLastBatch2(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch) throws Exception {
        if (streamBatch.miniBatchInProgress()) {
            streamBatch.addToBuffer();
        }
        if (!streamBatch.changed || streamBatch.minibatchCounter < 0) {
            return;
        }
        if (streamBatch.circularBuffer.isFull()) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= this.numberOfBatches2 - streamBatch.minibatchCounter) {
                    break;
                }
                if (!streamBatch.circularBuffer.isEmpty()) {
                    streamBatch.circularBuffer.remove();
                }
                j = j2 + serialVersionUID;
            }
        }
        if (streamBatch.circularBuffer.isEmpty()) {
            return;
        }
        reduce2(streamBatch);
    }

    public void reduceBatch1(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch) {
        reduce1(streamBatch);
        streamBatch.changed = false;
    }

    public void reduceBatch2(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch) {
        reduce2(streamBatch);
        streamBatch.changed = false;
    }
}
