package org.apache.flink.streaming.api.invokable.operator.co;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.class */
public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends CoWindowReduceInvokable<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1;
    private int keyPosition1;
    private int keyPosition2;
    private Map<Object, CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1>> streamWindows1;
    private Map<Object, CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2>> streamWindows2;
    private long currentMiniBatchCount1;
    private long currentMiniBatchCount2;

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable$GroupedStreamWindow.class */
    protected class GroupedStreamWindow<IN> extends CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN> {
        private static final long serialVersionUID = 1;

        public GroupedStreamWindow(long j, long j2) {
            super(j, j2);
        }

        @Override // org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable.StreamWindow, org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable.StreamBatch
        public boolean batchEnd() {
            return this.minibatchCounter == this.numberOfBatches;
        }
    }

    public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReduceFunction, long j, long j2, long j3, long j4, int i, int i2, TimeStamp<IN1> timeStamp, TimeStamp<IN2> timeStamp2) {
        super(coReduceFunction, j, j2, j3, j4, timeStamp, timeStamp2);
        this.currentMiniBatchCount1 = 0L;
        this.currentMiniBatchCount2 = 0L;
        this.keyPosition1 = i;
        this.keyPosition2 = i2;
        this.streamWindows1 = new HashMap();
        this.streamWindows2 = new HashMap();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> getBatch1(StreamRecord<IN1> streamRecord) {
        Object field = streamRecord.getField(this.keyPosition1);
        CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1> streamWindow = this.streamWindows1.get(field);
        if (streamWindow == null) {
            streamWindow = new GroupedStreamWindow(this.batchSize1, this.slideSize1);
            streamWindow.minibatchCounter = this.currentMiniBatchCount1;
            this.streamWindows1.put(field, streamWindow);
        }
        this.window1 = streamWindow;
        return streamWindow;
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> getBatch2(StreamRecord<IN2> streamRecord) {
        Object field = streamRecord.getField(this.keyPosition2);
        CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2> streamWindow = this.streamWindows2.get(field);
        if (streamWindow == null) {
            streamWindow = new GroupedStreamWindow(this.batchSize2, this.slideSize2);
            streamWindow.minibatchCounter = this.currentMiniBatchCount2;
            this.streamWindows2.put(field, streamWindow);
        }
        this.window2 = streamWindow;
        return streamWindow;
    }

    private void addToAllBuffers1() {
        Iterator<CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1>> it = this.streamWindows1.values().iterator();
        while (it.hasNext()) {
            it.next().addToBuffer();
        }
    }

    private void addToAllBuffers2() {
        Iterator<CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2>> it = this.streamWindows2.values().iterator();
        while (it.hasNext()) {
            it.next().addToBuffer();
        }
    }

    private void reduceAllWindows1() {
        for (CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1> streamWindow : this.streamWindows1.values()) {
            streamWindow.minibatchCounter -= this.batchPerSlide1;
            reduceBatch1(streamWindow);
        }
    }

    private void reduceAllWindows2() {
        for (CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2> streamWindow : this.streamWindows2.values()) {
            streamWindow.minibatchCounter -= this.batchPerSlide2;
            reduceBatch2(streamWindow);
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected void reduceLastBatch1() throws Exception {
        Iterator<CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1>> it = this.streamWindows1.values().iterator();
        while (it.hasNext()) {
            reduceLastBatch1(it.next());
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected void reduceLastBatch2() throws Exception {
        Iterator<CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2>> it = this.streamWindows2.values().iterator();
        while (it.hasNext()) {
            reduceLastBatch2(it.next());
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable
    protected synchronized void checkWindowEnd1(long j, CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1> streamWindow) {
        this.nextRecordTime1 = j;
        while (miniBatchEnd1()) {
            addToAllBuffers1();
            if (streamWindow.batchEnd()) {
                reduceAllWindows1();
            }
        }
        this.currentMiniBatchCount1 = streamWindow.minibatchCounter;
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable
    protected synchronized void checkWindowEnd2(long j, CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2> streamWindow) {
        this.nextRecordTime2 = j;
        while (miniBatchEnd2()) {
            addToAllBuffers2();
            if (streamWindow.batchEnd()) {
                reduceAllWindows2();
            }
        }
        this.currentMiniBatchCount2 = streamWindow.minibatchCounter;
    }
}
