package org.apache.flink.streaming.api.operators.windowing;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/ParallelMerge.class */
public class ParallelMerge<OUT> extends RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
    private static final long serialVersionUID = 1;
    protected Integer numberOfDiscretizers;
    private ReduceFunction<OUT> reducer;
    private Map<Integer, Integer> availableNumberOfParts = new HashMap();
    private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap();
    private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap();

    public ParallelMerge(ReduceFunction<OUT> reduceFunction) {
        this.reducer = reduceFunction;
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [java.lang.Integer, T1] */
    @Override // org.apache.flink.streaming.api.functions.co.CoFlatMapFunction
    public void flatMap1(StreamWindow<OUT> streamWindow, Collector<StreamWindow<OUT>> collector) throws Exception {
        Integer valueOf = Integer.valueOf(streamWindow.windowID);
        Tuple2<StreamWindow<OUT>, Integer> tuple2 = this.receivedWindows.get(valueOf);
        if (tuple2 == null) {
            tuple2 = new Tuple2<>(streamWindow, 1);
        } else {
            updateCurrent(tuple2.f0, streamWindow);
            Integer num = tuple2.f1;
            tuple2.f1 = Integer.valueOf(tuple2.f1.intValue() + 1);
        }
        Integer num2 = tuple2.f1;
        if (!this.availableNumberOfParts.containsKey(valueOf) || this.availableNumberOfParts.get(valueOf).intValue() > num2.intValue()) {
            this.receivedWindows.put(valueOf, tuple2);
            return;
        }
        collector.collect(tuple2.f0);
        this.receivedWindows.remove(valueOf);
        this.availableNumberOfParts.remove(valueOf);
        checkOld(valueOf);
    }

    private void checkOld(Integer num) {
        if (this.receivedWindows.containsKey(Integer.valueOf(num.intValue() - 1))) {
            throw new RuntimeException("Error in processing logic, window with id " + num + " should have already been processed");
        }
    }

    /* JADX WARN: Type inference failed for: r1v22, types: [T0, java.lang.Integer] */
    /* JADX WARN: Type inference failed for: r1v29, types: [java.lang.Integer, T1] */
    @Override // org.apache.flink.streaming.api.functions.co.CoFlatMapFunction
    public void flatMap2(Tuple2<Integer, Integer> tuple2, Collector<StreamWindow<OUT>> collector) throws Exception {
        Integer num = tuple2.f0;
        Integer num2 = tuple2.f1;
        Tuple2<Integer, Integer> tuple22 = this.receivedNumberOfParts.get(num);
        if (tuple22 != null) {
            tuple22.f0 = Integer.valueOf(tuple22.f0.intValue() + num2.intValue());
            Integer num3 = tuple22.f1;
            tuple22.f1 = Integer.valueOf(tuple22.f1.intValue() + 1);
        } else {
            tuple22 = new Tuple2<>(num2, 1);
            this.receivedNumberOfParts.put(num, tuple22);
        }
        if (tuple22.f1.intValue() >= this.numberOfDiscretizers.intValue()) {
            this.receivedNumberOfParts.remove(num);
            Tuple2<StreamWindow<OUT>, Integer> tuple23 = this.receivedWindows.get(num);
            if (Integer.valueOf(tuple23 != null ? tuple23.f1.intValue() : -1).intValue() >= tuple22.f0.intValue()) {
                collector.collect(tuple23.f0);
                this.receivedWindows.remove(num);
                checkOld(num);
            } else if (tuple22.f0.intValue() > 0) {
                this.availableNumberOfParts.put(num, tuple22.f1);
            }
        }
    }

    protected void updateCurrent(StreamWindow<OUT> streamWindow, StreamWindow<OUT> streamWindow2) throws Exception {
        if (streamWindow.size() != 1 || streamWindow2.size() != 1) {
            throw new RuntimeException("Error in parallel merge logic. Current window should contain only one element.");
        }
        streamWindow.add(this.reducer.reduce(streamWindow.remove(0), streamWindow2.get(0)));
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) {
        this.numberOfDiscretizers = Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks());
    }
}
