package org.apache.beam.runners.spark.translation;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.Lists;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.class */
public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
    private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;

    public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext, SerializablePipelineOptions serializablePipelineOptions, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy) {
        super(serializablePipelineOptions, map, windowingStrategy);
        this.combineFn = combineFnWithContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<WindowedValue<AccumT>> zeroValue() {
        return Lists.newArrayList();
    }

    private Iterable<WindowedValue<AccumT>> createAccumulator(WindowedValue<InputT> windowedValue) {
        Iterable sortByWindows = sortByWindows(windowedValue.explodeWindows());
        TimestampCombiner timestampCombiner = this.windowingStrategy.getTimestampCombiner();
        WindowFn windowFn = this.windowingStrategy.getWindowFn();
        Iterator it = sortByWindows.iterator();
        WindowedValue<?> windowedValue2 = (WindowedValue) it.next();
        IntervalWindow intervalWindow = (BoundedWindow) Iterables.getFirst(windowedValue2.getWindows(), null);
        Object addInput = this.combineFn.addInput(this.combineFn.createAccumulator(ctxtForInput(windowedValue2)), windowedValue2.getValue(), ctxtForInput(windowedValue2));
        Instant assign = timestampCombiner.assign(intervalWindow, this.windowingStrategy.getWindowFn().getOutputTime(windowedValue2.getTimestamp(), intervalWindow));
        ArrayList newArrayList = Lists.newArrayList();
        boolean z = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (it.hasNext()) {
            WindowedValue<?> windowedValue3 = (WindowedValue) it.next();
            IntervalWindow intervalWindow2 = (BoundedWindow) Iterables.getOnlyElement(windowedValue3.getWindows());
            boolean z2 = z && isIntersecting(intervalWindow, intervalWindow2);
            if (z2 || intervalWindow2.equals(intervalWindow)) {
                if (z2) {
                    intervalWindow = merge(intervalWindow, intervalWindow2);
                }
                addInput = this.combineFn.addInput(addInput, windowedValue3.getValue(), ctxtForInput(windowedValue3));
                assign = timestampCombiner.merge(intervalWindow, new Instant[]{assign, this.windowingStrategy.getWindowFn().getOutputTime(windowedValue3.getTimestamp(), intervalWindow)});
            } else {
                newArrayList.add(WindowedValue.of(addInput, assign, intervalWindow, PaneInfo.NO_FIRING));
                addInput = this.combineFn.addInput(this.combineFn.createAccumulator(ctxtForInput(windowedValue3)), windowedValue3.getValue(), ctxtForInput(windowedValue3));
                intervalWindow = intervalWindow2;
                assign = timestampCombiner.assign(intervalWindow, windowFn.getOutputTime(windowedValue3.getTimestamp(), intervalWindow));
            }
        }
        newArrayList.add(WindowedValue.of(addInput, assign, intervalWindow, PaneInfo.NO_FIRING));
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<WindowedValue<AccumT>> seqOp(Iterable<WindowedValue<AccumT>> iterable, WindowedValue<InputT> windowedValue) {
        return combOp(iterable, createAccumulator(windowedValue));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<WindowedValue<AccumT>> combOp(Iterable<WindowedValue<AccumT>> iterable, Iterable<WindowedValue<AccumT>> iterable2) {
        Iterable concat = Iterables.concat(iterable, iterable2);
        if (!concat.iterator().hasNext()) {
            return Lists.newArrayList();
        }
        Iterable sortByWindows = sortByWindows(concat);
        TimestampCombiner timestampCombiner = this.windowingStrategy.getTimestampCombiner();
        Iterator it = sortByWindows.iterator();
        WindowedValue windowedValue = (WindowedValue) it.next();
        IntervalWindow intervalWindow = (BoundedWindow) Iterables.getFirst(windowedValue.getWindows(), null);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(windowedValue.getValue());
        ArrayList newArrayList2 = Lists.newArrayList();
        newArrayList2.add(windowedValue.getTimestamp());
        ArrayList newArrayList3 = Lists.newArrayList();
        boolean z = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (it.hasNext()) {
            WindowedValue windowedValue2 = (WindowedValue) it.next();
            IntervalWindow intervalWindow2 = (BoundedWindow) Iterables.getOnlyElement(windowedValue2.getWindows());
            boolean z2 = z && isIntersecting(intervalWindow, intervalWindow2);
            if (z2 || intervalWindow2.equals(intervalWindow)) {
                if (z2) {
                    intervalWindow = merge(intervalWindow, intervalWindow2);
                }
                newArrayList.add(windowedValue2.getValue());
                newArrayList2.add(windowedValue2.getTimestamp());
            } else {
                Instant merge = timestampCombiner.merge(intervalWindow, newArrayList2);
                Iterable unmodifiableIterable = Iterables.unmodifiableIterable(newArrayList);
                WindowedValue<?> of = WindowedValue.of(unmodifiableIterable, merge, intervalWindow, PaneInfo.NO_FIRING);
                newArrayList3.add(of.withValue(this.combineFn.mergeAccumulators(unmodifiableIterable, ctxtForInput(of))));
                newArrayList.clear();
                newArrayList.add(windowedValue2.getValue());
                intervalWindow = intervalWindow2;
                newArrayList2.clear();
                newArrayList2.add(windowedValue2.getTimestamp());
            }
        }
        Instant merge2 = timestampCombiner.merge(intervalWindow, newArrayList2);
        Iterable unmodifiableIterable2 = Iterables.unmodifiableIterable(newArrayList);
        WindowedValue<?> of2 = WindowedValue.of(unmodifiableIterable2, merge2, intervalWindow, PaneInfo.NO_FIRING);
        newArrayList3.add(of2.withValue(this.combineFn.mergeAccumulators(unmodifiableIterable2, ctxtForInput(of2))));
        return newArrayList3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<WindowedValue<OutputT>> extractOutput(Iterable<WindowedValue<AccumT>> iterable) {
        return (Iterable) StreamSupport.stream(iterable.spliterator(), false).map(windowedValue -> {
            if (windowedValue == null) {
                return null;
            }
            return windowedValue.withValue(this.combineFn.extractOutput(windowedValue.getValue(), ctxtForInput(windowedValue)));
        }).collect(Collectors.toList());
    }
}
