package org.apache.flink.streaming.api.invokable.operator.co;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.state.CircularFifoList;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.class */
public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1;
    protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
    protected long windowSize;
    protected long slideSize;
    protected CircularFifoList<StreamRecord<IN1>> circularList1;
    protected CircularFifoList<StreamRecord<IN2>> circularList2;
    protected TimestampWrapper<IN1> timeStamp1;
    protected TimestampWrapper<IN2> timeStamp2;
    protected CoWindowInvokable<IN1, IN2, OUT>.StreamWindow window;
    protected long startTime;
    protected long nextRecordTime;

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable$StreamWindow.class */
    protected class StreamWindow implements Serializable {
        private static final long serialVersionUID = 1;
        protected int granularity;
        protected int batchPerSlide;
        protected long numberOfBatches;
        protected CircularFifoList<IN1> circularList1 = new CircularFifoList<>();
        protected CircularFifoList<IN2> circularList2 = new CircularFifoList<>();
        protected long minibatchCounter = 0;

        public StreamWindow() {
            this.granularity = (int) MathUtils.gcd(CoWindowInvokable.this.windowSize, CoWindowInvokable.this.slideSize);
            this.batchPerSlide = (int) (CoWindowInvokable.this.slideSize / this.granularity);
            this.numberOfBatches = CoWindowInvokable.this.windowSize / this.granularity;
        }

        public void addToBuffer1(IN1 in1) throws Exception {
            checkWindowEnd(CoWindowInvokable.this.timeStamp1.getTimestamp(in1));
            if (this.minibatchCounter >= 0) {
                this.circularList1.add(in1);
            }
        }

        public void addToBuffer2(IN2 in2) throws Exception {
            checkWindowEnd(CoWindowInvokable.this.timeStamp2.getTimestamp(in2));
            if (this.minibatchCounter >= 0) {
                this.circularList2.add(in2);
            }
        }

        protected synchronized void checkWindowEnd(long j) {
            CoWindowInvokable.this.nextRecordTime = j;
            while (miniBatchEnd()) {
                this.circularList1.newSlide();
                this.circularList2.newSlide();
                this.minibatchCounter += serialVersionUID;
                if (windowEnd()) {
                    CoWindowInvokable.this.callUserFunctionAndLogException();
                    this.circularList1.shiftWindow(this.batchPerSlide);
                    this.circularList2.shiftWindow(this.batchPerSlide);
                }
            }
        }

        protected boolean miniBatchEnd() {
            if (CoWindowInvokable.this.nextRecordTime < CoWindowInvokable.this.startTime + this.granularity) {
                return false;
            }
            CoWindowInvokable.this.startTime += this.granularity;
            return true;
        }

        public boolean windowEnd() {
            if (this.minibatchCounter != this.numberOfBatches) {
                return false;
            }
            this.minibatchCounter -= this.batchPerSlide;
            return true;
        }

        public void reduceLastBatch() {
            if (miniBatchEnd()) {
                return;
            }
            CoWindowInvokable.this.callUserFunctionAndLogException();
        }

        public Iterable<IN1> getIterable1() {
            return this.circularList1.getIterable();
        }

        public Iterable<IN2> getIterable2() {
            return this.circularList2.getIterable();
        }

        public String toString() {
            return this.circularList1.toString();
        }
    }

    public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long j, long j2, TimestampWrapper<IN1> timestampWrapper, TimestampWrapper<IN2> timestampWrapper2) {
        super(coWindowFunction);
        this.coWindowFunction = coWindowFunction;
        this.windowSize = j;
        this.slideSize = j2;
        this.circularList1 = new CircularFifoList<>();
        this.circularList2 = new CircularFifoList<>();
        this.timeStamp1 = timestampWrapper;
        this.timeStamp2 = timestampWrapper2;
        this.startTime = timestampWrapper.getStartTime();
        this.window = new StreamWindow();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void handleStream1() throws Exception {
        this.window.addToBuffer1(this.reuse1.getObject());
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void handleStream2() throws Exception {
        this.window.addToBuffer2(this.reuse2.getObject());
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void callUserFunction() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<IN1> it = this.window.circularList1.getElements().iterator();
        while (it.hasNext()) {
            arrayList.add(this.serializer1.copy(it.next()));
        }
        Iterator<IN2> it2 = this.window.circularList2.getElements().iterator();
        while (it2.hasNext()) {
            arrayList2.add(this.serializer2.copy(it2.next()));
        }
        if (this.window.circularList1.isEmpty() && this.window.circularList2.isEmpty()) {
            return;
        }
        this.coWindowFunction.coWindow(arrayList, arrayList2, this.collector);
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    public void close() {
        if (!this.window.miniBatchEnd()) {
            callUserFunctionAndLogException();
        }
        super.close();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void callUserFunction1() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoInvokable
    protected void callUserFunction2() throws Exception {
    }

    public void setSlideSize(long j) {
        this.slideSize = j;
    }
}
