package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.class */
public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow> extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v25, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Override // org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner
    public void combine(AbstractFlinkCombineRunner.FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner, WindowingStrategy<Object, W> windowingStrategy, SideInputReader sideInputReader, PipelineOptions pipelineOptions, Iterable<WindowedValue<KV<K, InputT>>> iterable, Collector<WindowedValue<KV<K, OutputT>>> collector) throws Exception {
        TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
        WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<WindowedValue<KV<K, InputT>>> it = iterable.iterator();
        while (it.hasNext()) {
            Iterator<WindowedValue<KV<K, InputT>>> it2 = it.next().explodeWindows().iterator();
            while (it2.hasNext()) {
                newArrayList.add(it2.next());
            }
        }
        newArrayList.sort(Comparator.comparing(windowedValue -> {
            return ((BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows())).maxTimestamp();
        }));
        if (!windowingStrategy.getWindowFn().isNonMerging()) {
            mergeWindow(newArrayList);
        }
        Iterator<WindowedValue<KV<K, InputT>>> it3 = newArrayList.iterator();
        WindowedValue<KV<K, InputT>> next = it3.next();
        K key = next.getValue().getKey();
        W w = (BoundedWindow) Iterables.getOnlyElement(next.getWindows());
        AccumT firstInput = flinkCombiner.firstInput(key, next.getValue().getValue(), pipelineOptions, sideInputReader, next.getWindows());
        Instant assign = timestampCombiner.assign(w, windowFn.getOutputTime(next.getTimestamp(), w));
        while (true) {
            Instant instant = assign;
            if (!it3.hasNext()) {
                collector.collect(WindowedValue.of(KV.of(key, flinkCombiner.extractOutput(key, firstInput, pipelineOptions, sideInputReader, next.getWindows())), instant, w, PaneInfo.NO_FIRING));
                return;
            }
            WindowedValue<KV<K, InputT>> next2 = it3.next();
            Object obj = (BoundedWindow) Iterables.getOnlyElement(next2.getWindows());
            if (w.equals(obj)) {
                firstInput = flinkCombiner.addInput(key, firstInput, next2.getValue().getValue(), pipelineOptions, sideInputReader, next.getWindows());
                assign = timestampCombiner.combine(instant, timestampCombiner.assign(w, windowFn.getOutputTime(next2.getTimestamp(), w)));
            } else {
                collector.collect(WindowedValue.of(KV.of(key, flinkCombiner.extractOutput(key, firstInput, pipelineOptions, sideInputReader, next.getWindows())), instant, w, PaneInfo.NO_FIRING));
                w = obj;
                next = next2;
                firstInput = flinkCombiner.firstInput(key, next2.getValue().getValue(), pipelineOptions, sideInputReader, next.getWindows());
                assign = timestampCombiner.assign(w, windowFn.getOutputTime(next2.getTimestamp(), w));
            }
        }
    }

    private void mergeWindow(List<WindowedValue<KV<K, InputT>>> list) {
        IntervalWindow intervalWindow;
        int i = 0;
        IntervalWindow intervalWindow2 = (IntervalWindow) Iterables.getOnlyElement(list.get(0).getWindows());
        for (int i2 = 1; i2 < list.size(); i2++) {
            IntervalWindow intervalWindow3 = (IntervalWindow) Iterables.getOnlyElement(list.get(i2).getWindows());
            if (intervalWindow2.intersects(intervalWindow3)) {
                intervalWindow = intervalWindow2.span(intervalWindow3);
            } else {
                for (int i3 = i2 - 1; i3 >= i; i3--) {
                    WindowedValue<KV<K, InputT>> windowedValue = list.get(i3);
                    list.set(i3, WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), intervalWindow2, windowedValue.getPane()));
                }
                i = i2;
                intervalWindow = intervalWindow3;
            }
            intervalWindow2 = intervalWindow;
        }
        if (i < list.size() - 1) {
            for (int size = list.size() - 1; size >= i; size--) {
                WindowedValue<KV<K, InputT>> windowedValue2 = list.get(size);
                list.set(size, WindowedValue.of(windowedValue2.getValue(), windowedValue2.getTimestamp(), intervalWindow2, windowedValue2.getPane()));
            }
        }
    }
}
