package org.apache.beam.runners.spark.translation;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.spark.api.java.function.Function;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn.class */
public class SparkCombineFn<InputT, ValueT, AccumT, OutputT> implements Serializable {
    private final boolean globalCombine;
    private final SerializablePipelineOptions options;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
    final WindowingStrategy<?, BoundedWindow> windowingStrategy;
    private final Function<InputT, ValueT> toValue;
    private final WindowedAccumulator.Type defaultNonMergingCombineStrategy;
    private final CombineWithContext.CombineFnWithContext<ValueT, AccumT, OutputT> combineFn;
    private final Comparator<BoundedWindow> windowComparator;
    private transient SparkCombineContext combineContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$MapBasedWindowedAccumulator.class */
    public static abstract class MapBasedWindowedAccumulator<InputT, ValueT, AccumT, ImplT extends MapBasedWindowedAccumulator<InputT, ValueT, AccumT, ImplT>> implements WindowedAccumulator<InputT, ValueT, AccumT, ImplT> {
        final Function<InputT, ValueT> toValue;
        final Map<BoundedWindow, WindowedValue<AccumT>> map;

        MapBasedWindowedAccumulator(Function<InputT, ValueT> function, Map<BoundedWindow, WindowedValue<AccumT>> map) {
            this.toValue = function;
            this.map = map;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public void add(WindowedValue<InputT> windowedValue, SparkCombineFn<InputT, ValueT, AccumT, ?> sparkCombineFn) throws Exception {
            for (WindowedValue windowedValue2 : windowedValue.explodeWindows()) {
                SparkCombineContext ctxtForValue = sparkCombineFn.ctxtForValue(windowedValue2);
                BoundedWindow window = SparkCombineFn.getWindow(windowedValue2);
                TimestampCombiner timestampCombiner = sparkCombineFn.windowingStrategy.getTimestampCombiner();
                Instant assign = timestampCombiner.assign(window, sparkCombineFn.windowingStrategy.getWindowFn().getOutputTime(windowedValue2.getTimestamp(), window));
                this.map.compute(window, (boundedWindow, windowedValue3) -> {
                    Object value;
                    Instant timestamp;
                    if (windowedValue3 == null) {
                        value = sparkCombineFn.combineFn.createAccumulator(ctxtForValue);
                        timestamp = assign;
                    } else {
                        value = windowedValue3.getValue();
                        timestamp = windowedValue3.getTimestamp();
                    }
                    return WindowedValue.of(sparkCombineFn.combineFn.addInput(value, toValue(windowedValue2), ctxtForValue), timestampCombiner.combine(new Instant[]{assign, timestamp}), window, PaneInfo.NO_FIRING);
                });
            }
            mergeWindows(sparkCombineFn);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public void merge(ImplT implt, SparkCombineFn<?, ?, AccumT, ?> sparkCombineFn) throws Exception {
            implt.map.forEach((boundedWindow, windowedValue) -> {
                WindowedValue<AccumT> windowedValue = this.map.get(boundedWindow);
                if (windowedValue == null) {
                    this.map.put(boundedWindow, windowedValue);
                } else {
                    this.map.put(boundedWindow, WindowedValue.of(sparkCombineFn.combineFn.mergeAccumulators(Lists.newArrayList(new Object[]{windowedValue.getValue(), windowedValue.getValue()}), sparkCombineFn.ctxtForValue(windowedValue)), sparkCombineFn.windowingStrategy.getTimestampCombiner().combine(new Instant[]{windowedValue.getTimestamp(), windowedValue.getTimestamp()}), boundedWindow, PaneInfo.NO_FIRING));
                }
            });
            mergeWindows(sparkCombineFn);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public Collection<WindowedValue<AccumT>> extractOutput() {
            return this.map.values();
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public boolean isEmpty() {
            return this.map.isEmpty();
        }

        void mergeWindows(SparkCombineFn<?, ?, AccumT, ?> sparkCombineFn) throws Exception {
        }

        private ValueT toValue(WindowedValue<InputT> windowedValue) {
            try {
                return (ValueT) this.toValue.call(windowedValue.getValue());
            } catch (Exception e) {
                throw UserCodeException.wrap(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$MergingWindowedAccumulator.class */
    public static class MergingWindowedAccumulator<InputT, ValueT, AccumT> extends MapBasedWindowedAccumulator<InputT, ValueT, AccumT, MergingWindowedAccumulator<InputT, ValueT, AccumT>> {
        static <InputT, ValueT, AccumT> MergingWindowedAccumulator<InputT, ValueT, AccumT> create(Function<InputT, ValueT> function, Comparator<BoundedWindow> comparator) {
            return new MergingWindowedAccumulator<>(function, comparator);
        }

        static <InputT, ValueT, AccumT> MergingWindowedAccumulator<InputT, ValueT, AccumT> from(Function<InputT, ValueT> function, Iterable<WindowedValue<AccumT>> iterable, Comparator<BoundedWindow> comparator) {
            return new MergingWindowedAccumulator<>(function, iterable, comparator);
        }

        private MergingWindowedAccumulator(Function<InputT, ValueT> function, Comparator<BoundedWindow> comparator) {
            super(function, new TreeMap(comparator));
        }

        private MergingWindowedAccumulator(Function<InputT, ValueT> function, Iterable<WindowedValue<AccumT>> iterable, Comparator<BoundedWindow> comparator) {
            super(function, SparkCombineFn.asMap(iterable, new TreeMap(comparator)));
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.MapBasedWindowedAccumulator
        void mergeWindows(SparkCombineFn<?, ?, AccumT, ?> sparkCombineFn) throws Exception {
            SparkCombineContext ctxtForWindows = sparkCombineFn.ctxtForWindows(this.map.keySet());
            WindowFn<Object, BoundedWindow> windowFn = sparkCombineFn.windowingStrategy.getWindowFn();
            windowFn.mergeWindows(asMergeContext(windowFn, (obj, obj2) -> {
                return sparkCombineFn.combineFn.mergeAccumulators(Lists.newArrayList(new Object[]{obj, obj2}), ctxtForWindows);
            }, (collection, kv) -> {
                Instant merge = sparkCombineFn.windowingStrategy.getTimestampCombiner().merge((BoundedWindow) kv.getKey(), (Iterable) collection.stream().map(boundedWindow -> {
                    return this.map.get(boundedWindow).getTimestamp();
                }).collect(Collectors.toList()));
                Map<BoundedWindow, WindowedValue<AccumT>> map = this.map;
                Objects.requireNonNull(map);
                collection.forEach((v1) -> {
                    r1.remove(v1);
                });
                this.map.put((BoundedWindow) kv.getKey(), WindowedValue.of(kv.getValue(), merge, (BoundedWindow) kv.getKey(), PaneInfo.NO_FIRING));
            }, this.map));
        }

        private WindowFn<Object, BoundedWindow>.MergeContext asMergeContext(WindowFn<Object, BoundedWindow> windowFn, BiFunction<AccumT, AccumT, AccumT> biFunction, BiConsumer<Collection<BoundedWindow>, KV<BoundedWindow, AccumT>> biConsumer, Map<BoundedWindow, WindowedValue<AccumT>> map) {
            Objects.requireNonNull(windowFn);
            return new WindowFn<Object, BoundedWindow>.MergeContext(windowFn, map, biFunction, biConsumer) { // from class: org.apache.beam.runners.spark.translation.SparkCombineFn.MergingWindowedAccumulator.1
                final /* synthetic */ Map val$map;
                final /* synthetic */ BiFunction val$mergeFn;
                final /* synthetic */ BiConsumer val$afterMerge;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(windowFn);
                    this.val$map = map;
                    this.val$mergeFn = biFunction;
                    this.val$afterMerge = biConsumer;
                    Objects.requireNonNull(windowFn);
                }

                public Collection<BoundedWindow> windows() {
                    return this.val$map.keySet();
                }

                public void merge(Collection<BoundedWindow> collection, BoundedWindow boundedWindow) {
                    Object obj = null;
                    Iterator<BoundedWindow> it = collection.iterator();
                    while (it.hasNext()) {
                        WindowedValue windowedValue = (WindowedValue) Objects.requireNonNull((WindowedValue) this.val$map.get(it.next()));
                        obj = obj == null ? windowedValue.getValue() : this.val$mergeFn.apply(obj, windowedValue.getValue());
                    }
                    this.val$afterMerge.accept(collection, KV.of(boundedWindow, obj));
                }
            };
        }

        public String toString() {
            return "MergingWindowedAccumulator(" + this.map + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$NonMergingWindowedAccumulator.class */
    public static class NonMergingWindowedAccumulator<InputT, ValueT, AccumT> extends MapBasedWindowedAccumulator<InputT, ValueT, AccumT, NonMergingWindowedAccumulator<InputT, ValueT, AccumT>> {
        static <InputT, ValueT, AccumT> NonMergingWindowedAccumulator<InputT, ValueT, AccumT> create(Function<InputT, ValueT> function) {
            return new NonMergingWindowedAccumulator<>(function);
        }

        static <InputT, ValueT, AccumT> NonMergingWindowedAccumulator<InputT, ValueT, AccumT> from(Function<InputT, ValueT> function, Iterable<WindowedValue<AccumT>> iterable) {
            return new NonMergingWindowedAccumulator<>(function, iterable);
        }

        private NonMergingWindowedAccumulator(Function<InputT, ValueT> function) {
            super(function, new HashMap());
        }

        private NonMergingWindowedAccumulator(Function<InputT, ValueT> function, Iterable<WindowedValue<AccumT>> iterable) {
            super(function, SparkCombineFn.asMap(iterable, new HashMap()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$SingleWindowWindowedAccumulator.class */
    public static class SingleWindowWindowedAccumulator<InputT, ValueT, AccumT> implements WindowedAccumulator<InputT, ValueT, AccumT, SingleWindowWindowedAccumulator<InputT, ValueT, AccumT>> {
        final Function<InputT, ValueT> toValue;
        AccumT windowAccumulator;
        Instant accTimestamp;
        BoundedWindow accWindow;

        static <InputT, ValueT, AccumT> SingleWindowWindowedAccumulator<InputT, ValueT, AccumT> create(Function<InputT, ValueT> function) {
            return new SingleWindowWindowedAccumulator<>(function);
        }

        static <InputT, ValueT, AccumT> WindowedAccumulator<InputT, ValueT, AccumT, ?> create(Function<InputT, ValueT> function, WindowedValue<AccumT> windowedValue) {
            return new SingleWindowWindowedAccumulator(function, windowedValue);
        }

        SingleWindowWindowedAccumulator(Function<InputT, ValueT> function) {
            this.windowAccumulator = null;
            this.accTimestamp = null;
            this.accWindow = null;
            this.toValue = function;
        }

        SingleWindowWindowedAccumulator(Function<InputT, ValueT> function, WindowedValue<AccumT> windowedValue) {
            this.windowAccumulator = null;
            this.accTimestamp = null;
            this.accWindow = null;
            this.toValue = function;
            this.windowAccumulator = (AccumT) windowedValue.getValue();
            this.accTimestamp = windowedValue.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE) ? null : windowedValue.getTimestamp();
            this.accWindow = SparkCombineFn.getWindow(windowedValue);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public void add(WindowedValue<InputT> windowedValue, SparkCombineFn<InputT, ValueT, AccumT, ?> sparkCombineFn) throws Exception {
            Object obj;
            Instant instant;
            BoundedWindow window = SparkCombineFn.getWindow(windowedValue);
            SparkCombineContext ctxtForValue = sparkCombineFn.ctxtForValue(windowedValue);
            TimestampCombiner timestampCombiner = sparkCombineFn.windowingStrategy.getTimestampCombiner();
            Instant assign = timestampCombiner.assign(window, sparkCombineFn.windowingStrategy.getWindowFn().getOutputTime(windowedValue.getTimestamp(), window));
            if (this.windowAccumulator == null) {
                obj = ((SparkCombineFn) sparkCombineFn).combineFn.createAccumulator(ctxtForValue);
                instant = assign;
            } else {
                obj = this.windowAccumulator;
                instant = this.accTimestamp;
            }
            AccumT accumt = (AccumT) ((SparkCombineFn) sparkCombineFn).combineFn.addInput(obj, toValue(windowedValue), ctxtForValue);
            Instant combine = timestampCombiner.combine(new Instant[]{assign, instant});
            this.windowAccumulator = accumt;
            this.accTimestamp = combine;
            this.accWindow = window;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public void merge(SingleWindowWindowedAccumulator<InputT, ValueT, AccumT> singleWindowWindowedAccumulator, SparkCombineFn<?, ?, AccumT, ?> sparkCombineFn) {
            if (this.windowAccumulator == null || singleWindowWindowedAccumulator.windowAccumulator == null) {
                if (this.windowAccumulator == null) {
                    this.windowAccumulator = singleWindowWindowedAccumulator.windowAccumulator;
                    this.accTimestamp = singleWindowWindowedAccumulator.accTimestamp;
                    this.accWindow = singleWindowWindowedAccumulator.accWindow;
                    return;
                }
                return;
            }
            AccumT accumt = (AccumT) ((SparkCombineFn) sparkCombineFn).combineFn.mergeAccumulators(Arrays.asList(this.windowAccumulator, singleWindowWindowedAccumulator.windowAccumulator), sparkCombineFn.ctxtForWindows(Arrays.asList(this.accWindow)));
            Instant combine = sparkCombineFn.windowingStrategy.getTimestampCombiner().combine(new Instant[]{this.accTimestamp, singleWindowWindowedAccumulator.accTimestamp});
            this.windowAccumulator = accumt;
            this.accTimestamp = combine;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public Collection<WindowedValue<AccumT>> extractOutput() {
            return this.windowAccumulator != null ? Arrays.asList(WindowedValue.of(this.windowAccumulator, this.accTimestamp, this.accWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING)) : Collections.emptyList();
        }

        private ValueT toValue(WindowedValue<InputT> windowedValue) {
            try {
                return (ValueT) this.toValue.call(windowedValue.getValue());
            } catch (Exception e) {
                throw UserCodeException.wrap(e);
            }
        }

        @Override // org.apache.beam.runners.spark.translation.SparkCombineFn.WindowedAccumulator
        public boolean isEmpty() {
            return this.windowAccumulator == null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$SparkCombineContext.class */
    public static class SparkCombineContext extends CombineWithContext.Context {
        private final PipelineOptions pipelineOptions;
        private final SideInputReader sideInputReader;
        Collection<? extends BoundedWindow> windows = null;

        SparkCombineContext(PipelineOptions pipelineOptions, SideInputReader sideInputReader) {
            this.pipelineOptions = pipelineOptions;
            this.sideInputReader = sideInputReader;
        }

        SparkCombineContext forInput(Collection<? extends BoundedWindow> collection) {
            this.windows = (Collection) Objects.requireNonNull(collection);
            return this;
        }

        public PipelineOptions getPipelineOptions() {
            return this.pipelineOptions;
        }

        public <T> T sideInput(PCollectionView<T> pCollectionView) {
            Preconditions.checkState(this.windows.size() == 1, "sideInput can only be called when the main input element is in exactly one window");
            return (T) this.sideInputReader.get(pCollectionView, this.windows.iterator().next());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$WindowedAccumulator.class */
    public interface WindowedAccumulator<InputT, ValueT, AccumT, ImplT extends WindowedAccumulator<InputT, ValueT, AccumT, ImplT>> {

        /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$WindowedAccumulator$Type.class */
        public enum Type {
            MERGING,
            NON_MERGING,
            EXPLODE_WINDOWS,
            SINGLE_WINDOW;

            boolean isMapBased() {
                return this == NON_MERGING || this == MERGING;
            }
        }

        boolean isEmpty();

        void add(WindowedValue<InputT> windowedValue, SparkCombineFn<InputT, ValueT, AccumT, ?> sparkCombineFn) throws Exception;

        void merge(ImplT implt, SparkCombineFn<?, ?, AccumT, ?> sparkCombineFn) throws Exception;

        Collection<WindowedValue<AccumT>> extractOutput();

        static <InputT, ValueT, AccumT> WindowedAccumulator<InputT, ValueT, AccumT, ?> create(SparkCombineFn<InputT, ValueT, AccumT, ?> sparkCombineFn, Function<InputT, ValueT> function, WindowingStrategy<?, ?> windowingStrategy, Comparator<BoundedWindow> comparator) {
            return create(function, sparkCombineFn.getType(windowingStrategy), comparator);
        }

        static <InputT, ValueT, AccumT> WindowedAccumulator<InputT, ValueT, AccumT, ?> create(Function<InputT, ValueT> function, Type type, Comparator<BoundedWindow> comparator) {
            switch (type) {
                case MERGING:
                    return MergingWindowedAccumulator.create(function, comparator);
                case NON_MERGING:
                    return NonMergingWindowedAccumulator.create(function);
                case SINGLE_WINDOW:
                case EXPLODE_WINDOWS:
                    return SingleWindowWindowedAccumulator.create(function);
                default:
                    throw new IllegalArgumentException("Unknown type: " + type);
            }
        }

        static <InputT, ValueT, AccumT> WindowedAccumulator<InputT, ValueT, AccumT, ?> create(Function<InputT, ValueT> function, Type type, Iterable<WindowedValue<AccumT>> iterable, Comparator<BoundedWindow> comparator) {
            switch (type) {
                case MERGING:
                    return MergingWindowedAccumulator.from(function, iterable, comparator);
                case NON_MERGING:
                    return NonMergingWindowedAccumulator.from(function, iterable);
                case SINGLE_WINDOW:
                case EXPLODE_WINDOWS:
                    Iterator<WindowedValue<AccumT>> it = iterable.iterator();
                    return it.hasNext() ? SingleWindowWindowedAccumulator.create(function, it.next()) : SingleWindowWindowedAccumulator.create(function);
                default:
                    throw new IllegalArgumentException("Unknown type: " + type);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFn$WindowedAccumulatorCoder.class */
    static class WindowedAccumulatorCoder<InputT, ValueT, AccumT> extends Coder<WindowedAccumulator<InputT, ValueT, AccumT, ?>> {
        private final Function<InputT, ValueT> toValue;
        private final IterableCoder<WindowedValue<AccumT>> wrap;
        private final Coder<WindowedValue<AccumT>> accumCoder;
        private final Comparator<BoundedWindow> windowComparator;
        private final WindowedAccumulator.Type type;

        WindowedAccumulatorCoder(Function<InputT, ValueT> function, Coder<BoundedWindow> coder, Comparator<BoundedWindow> comparator, Coder<AccumT> coder2, WindowedAccumulator.Type type) {
            this.toValue = function;
            this.accumCoder = WindowedValue.FullWindowedValueCoder.of(coder2, coder);
            this.windowComparator = comparator;
            this.wrap = IterableCoder.of(this.accumCoder);
            this.type = type;
        }

        public void encode(WindowedAccumulator<InputT, ValueT, AccumT, ?> windowedAccumulator, OutputStream outputStream) throws CoderException, IOException {
            if (this.type.isMapBased()) {
                this.wrap.encode(((MapBasedWindowedAccumulator) windowedAccumulator).map.values(), outputStream);
                return;
            }
            SingleWindowWindowedAccumulator singleWindowWindowedAccumulator = (SingleWindowWindowedAccumulator) windowedAccumulator;
            if (singleWindowWindowedAccumulator.isEmpty()) {
                outputStream.write(0);
            } else {
                outputStream.write(1);
                this.accumCoder.encode(WindowedValue.of(singleWindowWindowedAccumulator.windowAccumulator, singleWindowWindowedAccumulator.accTimestamp, singleWindowWindowedAccumulator.accWindow, PaneInfo.NO_FIRING), outputStream);
            }
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public WindowedAccumulator<InputT, ValueT, AccumT, ?> m83decode(InputStream inputStream) throws CoderException, IOException {
            if (this.type.isMapBased()) {
                return WindowedAccumulator.create(this.toValue, this.type, this.wrap.decode(inputStream), this.windowComparator);
            }
            return inputStream.read() == 0 ? WindowedAccumulator.create(this.toValue, this.type, this.windowComparator) : WindowedAccumulator.create(this.toValue, this.type, Arrays.asList((WindowedValue) this.accumCoder.decode(inputStream)), this.windowComparator);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return this.wrap.getComponents();
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }
    }

    @VisibleForTesting
    static <K, V, AccumT, OutputT> SparkCombineFn<KV<K, V>, V, AccumT, OutputT> keyed(CombineWithContext.CombineFnWithContext<V, AccumT, OutputT> combineFnWithContext, SerializablePipelineOptions serializablePipelineOptions, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy, WindowedAccumulator.Type type) {
        return new SparkCombineFn<>(false, (v0) -> {
            return v0.getValue();
        }, combineFnWithContext, serializablePipelineOptions, map, windowingStrategy, type);
    }

    public static <K, V, AccumT, OutputT> SparkCombineFn<KV<K, V>, V, AccumT, OutputT> keyed(CombineWithContext.CombineFnWithContext<V, AccumT, OutputT> combineFnWithContext, SerializablePipelineOptions serializablePipelineOptions, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy) {
        return new SparkCombineFn<>(false, (v0) -> {
            return v0.getValue();
        }, combineFnWithContext, serializablePipelineOptions, map, windowingStrategy);
    }

    public static <InputT, AccumT, OutputT> SparkCombineFn<InputT, InputT, AccumT, OutputT> globally(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext, SerializablePipelineOptions serializablePipelineOptions, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy) {
        return new SparkCombineFn<>(true, obj -> {
            return obj;
        }, combineFnWithContext, serializablePipelineOptions, map, windowingStrategy);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Map<BoundedWindow, WindowedValue<T>> asMap(Iterable<WindowedValue<T>> iterable, Map<BoundedWindow, WindowedValue<T>> map) {
        for (WindowedValue<T> windowedValue : iterable) {
            map.put(getWindow(windowedValue), windowedValue);
        }
        return map;
    }

    private static Comparator<BoundedWindow> asWindowComparator(@Nullable TypeDescriptor<?> typeDescriptor) {
        return (typeDescriptor == null || !StreamSupport.stream(typeDescriptor.getInterfaces().spliterator(), false).anyMatch(typeDescriptor2 -> {
            return typeDescriptor2.isSubtypeOf(TypeDescriptor.of(Comparable.class));
        })) ? Comparator.comparing((java.util.function.Function) ((Serializable) (v0) -> {
            return v0.maxTimestamp();
        })) : Comparator.naturalOrder();
    }

    SparkCombineFn(boolean z, Function<InputT, ValueT> function, CombineWithContext.CombineFnWithContext<ValueT, AccumT, OutputT> combineFnWithContext, SerializablePipelineOptions serializablePipelineOptions, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy) {
        this(z, function, combineFnWithContext, serializablePipelineOptions, map, windowingStrategy, WindowedAccumulator.Type.EXPLODE_WINDOWS);
    }

    @VisibleForTesting
    SparkCombineFn(boolean z, Function<InputT, ValueT> function, CombineWithContext.CombineFnWithContext<ValueT, AccumT, OutputT> combineFnWithContext, SerializablePipelineOptions serializablePipelineOptions, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy, WindowedAccumulator.Type type) {
        this.globalCombine = z;
        this.options = serializablePipelineOptions;
        this.sideInputs = map;
        this.windowingStrategy = windowingStrategy;
        this.toValue = function;
        this.defaultNonMergingCombineStrategy = type;
        this.combineFn = combineFnWithContext;
        this.windowComparator = asWindowComparator(windowingStrategy.getWindowFn().getWindowTypeDescriptor());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowedAccumulator<InputT, ValueT, AccumT, ?> createCombiner() {
        return WindowedAccumulator.create((SparkCombineFn) this, (Function) this.toValue, (WindowingStrategy<?, ?>) this.windowingStrategy, this.windowComparator);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowedAccumulator<InputT, ValueT, AccumT, ?> createCombiner(WindowedValue<InputT> windowedValue) {
        try {
            WindowedAccumulator<InputT, ValueT, AccumT, ?> create = WindowedAccumulator.create((SparkCombineFn) this, (Function) this.toValue, (WindowingStrategy<?, ?>) this.windowingStrategy, this.windowComparator);
            create.add(windowedValue, this);
            return create;
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowedAccumulator<InputT, ValueT, AccumT, ?> mergeValue(WindowedAccumulator<InputT, ValueT, AccumT, ?> windowedAccumulator, WindowedValue<InputT> windowedValue) {
        try {
            windowedAccumulator.add(windowedValue, this);
            return windowedAccumulator;
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowedAccumulator<InputT, ValueT, AccumT, ?> mergeCombiners(WindowedAccumulator windowedAccumulator, WindowedAccumulator windowedAccumulator2) {
        try {
            windowedAccumulator.merge(windowedAccumulator2, this);
            return windowedAccumulator;
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<WindowedValue<OutputT>> extractOutput(WindowedAccumulator<?, ?, AccumT, ?> windowedAccumulator) {
        return (Iterable) extractOutputStream(windowedAccumulator).collect(Collectors.toList());
    }

    public Stream<WindowedValue<OutputT>> extractOutputStream(WindowedAccumulator<?, ?, AccumT, ?> windowedAccumulator) {
        return (Stream<WindowedValue<OutputT>>) windowedAccumulator.extractOutput().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(windowedValue -> {
            return windowedValue.withValue(this.combineFn.extractOutput(windowedValue.getValue(), ctxtForValue(windowedValue)));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowedAccumulatorCoder<InputT, ValueT, AccumT> accumulatorCoder(Coder<BoundedWindow> coder, Coder<AccumT> coder2, WindowingStrategy<?, ?> windowingStrategy) {
        return new WindowedAccumulatorCoder<>(this.toValue, coder, this.windowComparator, coder2, getType(windowingStrategy));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CombineWithContext.CombineFnWithContext<ValueT, AccumT, OutputT> getCombineFn() {
        return this.combineFn;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean mustBringWindowToKey() {
        return !getType(this.windowingStrategy).isMapBased();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WindowedAccumulator.Type getType(WindowingStrategy<?, ?> windowingStrategy) {
        return windowingStrategy.getWindowFn().isNonMerging() ? this.globalCombine ? WindowedAccumulator.Type.NON_MERGING : (windowingStrategy.getWindowFn().assignsToOneWindow() && GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) ? WindowedAccumulator.Type.SINGLE_WINDOW : this.defaultNonMergingCombineStrategy : WindowedAccumulator.Type.MERGING;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BoundedWindow getWindow(WindowedValue<?> windowedValue) {
        return windowedValue.isSingleWindowedValue() ? ((WindowedValue.SingleWindowedValue) windowedValue).getWindow() : (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkCombineContext ctxtForValue(WindowedValue<?> windowedValue) {
        return ctxtForWindows(windowedValue.getWindows());
    }

    SparkCombineContext ctxtForWindows(Collection<BoundedWindow> collection) {
        if (this.combineContext == null) {
            this.combineContext = new SparkCombineContext(this.options.get(), new SparkSideInputReader(this.sideInputs));
        }
        return this.combineContext.forInput(collection);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1368165358:
                if (implMethodName.equals("maxTimestamp")) {
                    z = true;
                    break;
                }
                break;
            case 932385478:
                if (implMethodName.equals("lambda$globally$fe370164$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/windowing/BoundedWindow") && serializedLambda.getImplMethodSignature().equals("()Lorg/joda/time/Instant;")) {
                    return (v0) -> {
                        return v0.maxTimestamp();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/SparkCombineFn") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return obj;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
