package org.apache.flink.streaming.api.invokable.operator.co;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.class */
public class CoGroupedBatchReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1;
    int keyPosition1;
    int keyPosition2;
    Map<Object, CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1>> streamBatches1;
    Map<Object, CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2>> streamBatches2;

    public CoGroupedBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReduceFunction, long j, long j2, long j3, long j4, int i, int i2) {
        super(coReduceFunction, j, j2, j3, j4);
        this.keyPosition1 = i;
        this.keyPosition2 = i2;
        this.streamBatches1 = new HashMap();
        this.streamBatches2 = new HashMap();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected void reduceLastBatch1() throws Exception {
        Iterator<CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1>> it = this.streamBatches1.values().iterator();
        while (it.hasNext()) {
            reduceLastBatch1(it.next());
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected void reduceLastBatch2() throws Exception {
        Iterator<CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2>> it = this.streamBatches2.values().iterator();
        while (it.hasNext()) {
            reduceLastBatch2(it.next());
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> getBatch1(StreamRecord<IN1> streamRecord) {
        Object field = streamRecord.getField(this.keyPosition1);
        CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch = this.streamBatches1.get(field);
        if (streamBatch == null) {
            streamBatch = new CoBatchReduceInvokable.StreamBatch<>(this.batchSize1, this.slideSize1);
            this.streamBatches1.put(field, streamBatch);
        }
        return streamBatch;
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    protected CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> getBatch2(StreamRecord<IN2> streamRecord) {
        Object field = streamRecord.getField(this.keyPosition2);
        CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch = this.streamBatches2.get(field);
        if (streamBatch == null) {
            streamBatch = new CoBatchReduceInvokable.StreamBatch<>(this.batchSize2, this.slideSize2);
            this.streamBatches2.put(field, streamBatch);
        }
        return streamBatch;
    }
}
