package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.class */
public class HashingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow> extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner$MergeContextImpl.class */
    public class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
        private Set<W> windows;
        private Map<W, W> windowToMergeResult;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        MergeContextImpl(WindowFn<Object, W> windowFn, Set<W> set, Map<W, W> map) {
            super(windowFn);
            Objects.requireNonNull(windowFn);
            this.windows = set;
            this.windowToMergeResult = map;
        }

        public Collection<W> windows() {
            return this.windows;
        }

        public void merge(Collection<W> collection, W w) throws Exception {
            Iterator<W> it = collection.iterator();
            while (it.hasNext()) {
                this.windowToMergeResult.put(it.next(), w);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner
    public void combine(AbstractFlinkCombineRunner.FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner, WindowingStrategy<Object, W> windowingStrategy, SideInputReader sideInputReader, PipelineOptions pipelineOptions, Iterable<WindowedValue<KV<K, InputT>>> iterable, Collector<WindowedValue<KV<K, OutputT>>> collector) throws Exception {
        TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
        WindowFn windowFn = windowingStrategy.getWindowFn();
        ArrayList arrayList = new ArrayList();
        Iterables.addAll(arrayList, iterable);
        Map<W, W> mergeWindows = mergeWindows(windowingStrategy, collectWindows(arrayList));
        HashMap hashMap = new HashMap();
        Iterator it = arrayList.iterator();
        WindowedValue windowedValue = (WindowedValue) it.next();
        Object key = ((KV) windowedValue.getValue()).getKey();
        while (true) {
            for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
                W w = mergeWindows.get(boundedWindow);
                BoundedWindow boundedWindow2 = w == null ? boundedWindow : w;
                Set singleton = Collections.singleton(boundedWindow2);
                Tuple2 tuple2 = (Tuple2) hashMap.get(boundedWindow2);
                if (tuple2 == null) {
                    hashMap.put(boundedWindow2, new Tuple2(flinkCombiner.firstInput(key, ((KV) windowedValue.getValue()).getValue(), pipelineOptions, sideInputReader, singleton), timestampCombiner.assign(boundedWindow2, windowFn.getOutputTime(windowedValue.getTimestamp(), boundedWindow2))));
                } else {
                    tuple2.f0 = flinkCombiner.addInput(key, tuple2.f0, ((KV) windowedValue.getValue()).getValue(), pipelineOptions, sideInputReader, singleton);
                    tuple2.f1 = timestampCombiner.combine(new Instant[]{(Instant) tuple2.f1, timestampCombiner.assign(boundedWindow2, windowingStrategy.getWindowFn().getOutputTime(windowedValue.getTimestamp(), boundedWindow2))});
                }
            }
            if (!it.hasNext()) {
                break;
            } else {
                windowedValue = (WindowedValue) it.next();
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            collector.collect(WindowedValue.of(KV.of(key, flinkCombiner.extractOutput(key, ((Tuple2) entry.getValue()).f0, pipelineOptions, sideInputReader, Collections.singleton((BoundedWindow) entry.getKey()))), (Instant) ((Tuple2) entry.getValue()).f1, (BoundedWindow) entry.getKey(), PaneInfo.NO_FIRING));
        }
    }

    private Map<W, W> mergeWindows(WindowingStrategy<Object, W> windowingStrategy, Set<W> set) throws Exception {
        WindowFn windowFn = windowingStrategy.getWindowFn();
        if (windowingStrategy.getWindowFn().isNonMerging()) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        windowFn.mergeWindows(new MergeContextImpl(windowFn, set, hashMap));
        return hashMap;
    }

    private Set<W> collectWindows(Iterable<WindowedValue<KV<K, InputT>>> iterable) {
        HashSet hashSet = new HashSet();
        Iterator<WindowedValue<KV<K, InputT>>> it = iterable.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getWindows().iterator();
            while (it2.hasNext()) {
                hashSet.add((BoundedWindow) it2.next());
            }
        }
        return hashSet;
    }
}
