package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.MutablePair;
import org.joda.time.Instant;

@Internal
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators.class */
class Aggregators {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$CombineFnAggregator.class */
    public static abstract class CombineFnAggregator<ValT, AccT, ResT, InT, BuffT, OutT> extends Aggregator<InT, BuffT, OutT> {
        private final Combine.CombineFn<ValT, AccT, ResT> fn;
        private final ScalaInterop.Fun1<InT, ValT> valueFn;
        private final Encoder<BuffT> bufferEnc;
        private final Encoder<OutT> outputEnc;

        public CombineFnAggregator(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<InT, ValT> fun1, Encoder<BuffT> encoder, Encoder<OutT> encoder2) {
            this.fn = combineFn;
            this.valueFn = fun1;
            this.bufferEnc = encoder;
            this.outputEnc = encoder2;
        }

        protected final ValT value(InT r4) {
            return (ValT) this.valueFn.apply(r4);
        }

        protected final AccT emptyAcc() {
            return (AccT) this.fn.createAccumulator();
        }

        protected final AccT mergeAccs(AccT acct, AccT acct2) {
            return (AccT) this.fn.mergeAccumulators(ImmutableList.of(acct, acct2));
        }

        protected final AccT addToAcc(AccT acct, ValT valt) {
            return (AccT) this.fn.addInput(acct, valt);
        }

        protected final ResT extract(AccT acct) {
            return (ResT) this.fn.extractOutput(acct);
        }

        public Encoder<BuffT> bufferEncoder() {
            return this.bufferEnc;
        }

        public Encoder<OutT> outputEncoder() {
            return this.outputEnc;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$MergingWindowedAggregator.class */
    private static class MergingWindowedAggregator<ValT, AccT, ResT, InT> extends NonMergingWindowedAggregator<ValT, AccT, ResT, InT> {
        private final WindowFn<ValT, BoundedWindow> windowFn;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$MergingWindowedAggregator$ReduceFn.class */
        public interface ReduceFn<AccT> extends BiFunction<MutablePair<Instant, AccT>, BoundedWindow, MutablePair<Instant, AccT>> {
        }

        public MergingWindowedAggregator(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<WindowedValue<InT>, ValT> fun1, WindowingStrategy<?, ?> windowingStrategy, Encoder<BoundedWindow> encoder, Encoder<AccT> encoder2, Encoder<WindowedValue<ResT>> encoder3) {
            super(combineFn, fun1, windowingStrategy, encoder, encoder2, encoder3);
            this.windowFn = windowingStrategy.getWindowFn();
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.Aggregators.NonMergingWindowedAggregator
        protected Map<BoundedWindow, MutablePair<Instant, AccT>> reduce(Map<BoundedWindow, MutablePair<Instant, AccT>> map, Collection<BoundedWindow> collection, ValT valt, Instant instant) {
            if (map.isEmpty()) {
                return super.reduce(map, collection, valt, instant);
            }
            Set<BoundedWindow> mergeWindows = mergeWindows(map, ImmutableSet.copyOf(collection), boundedWindow -> {
                return (mutablePair, boundedWindow) -> {
                    MutablePair mutablePair = (MutablePair) map.remove(boundedWindow);
                    return mutablePair != null ? mergeAccs(boundedWindow, mutablePair, mutablePair) : addToAcc(boundedWindow, mutablePair, valt, instant);
                };
            });
            return !mergeWindows.isEmpty() ? super.reduce(map, mergeWindows, valt, instant) : map;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.Aggregators.NonMergingWindowedAggregator
        public Map<BoundedWindow, MutablePair<Instant, AccT>> merge(Map<BoundedWindow, MutablePair<Instant, AccT>> map, Map<BoundedWindow, MutablePair<Instant, AccT>> map2) {
            Set<BoundedWindow> mergeWindows = mergeWindows(map, map2.keySet(), boundedWindow -> {
                return (mutablePair, boundedWindow) -> {
                    return mergeAccs(boundedWindow, mergeAccs(boundedWindow, mutablePair, (MutablePair) map.remove(boundedWindow)), (MutablePair) map2.remove(boundedWindow));
                };
            });
            if (mergeWindows.isEmpty()) {
                return map;
            }
            map2.keySet().retainAll(mergeWindows);
            return super.merge((Map) map, (Map) map2);
        }

        private Set<BoundedWindow> mergeWindows(Map<BoundedWindow, MutablePair<Instant, AccT>> map, Set<BoundedWindow> set, Function<BoundedWindow, ReduceFn<AccT>> function) {
            try {
                HashSet hashSet = new HashSet(set);
                WindowFn<ValT, BoundedWindow> windowFn = this.windowFn;
                WindowFn<ValT, BoundedWindow> windowFn2 = this.windowFn;
                Objects.requireNonNull(windowFn2);
                windowFn.mergeWindows(new WindowFn<ValT, BoundedWindow>.MergeContext(windowFn2, map, set, function, hashSet) { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.Aggregators.MergingWindowedAggregator.1
                    final /* synthetic */ Map val$buff;
                    final /* synthetic */ Set val$newWindows;
                    final /* synthetic */ Function val$reduceFn;
                    final /* synthetic */ Set val$newUnmerged;

                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(windowFn2);
                        this.val$buff = map;
                        this.val$newWindows = set;
                        this.val$reduceFn = function;
                        this.val$newUnmerged = hashSet;
                        Objects.requireNonNull(windowFn2);
                    }

                    public Collection<BoundedWindow> windows() {
                        return Sets.union(this.val$buff.keySet(), this.val$newWindows);
                    }

                    public void merge(Collection<BoundedWindow> collection, BoundedWindow boundedWindow) {
                        MutablePair mutablePair = (MutablePair) collection.stream().reduce(null, (BiFunction) this.val$reduceFn.apply(boundedWindow), MergingWindowedAggregator.this.combiner(boundedWindow));
                        if (mutablePair != null) {
                            this.val$buff.put(boundedWindow, mutablePair);
                        }
                        this.val$newUnmerged.removeAll(collection);
                    }
                });
                return hashSet;
            } catch (Exception e) {
                throw new RuntimeException("Unable to merge accumulators windows", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$NonMergingWindowedAggregator.class */
    public static class NonMergingWindowedAggregator<ValT, AccT, ResT, InT> extends WindowedAggregator<ValT, AccT, ResT, InT, BoundedWindow, Map<BoundedWindow, MutablePair<Instant, AccT>>> {
        public NonMergingWindowedAggregator(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<WindowedValue<InT>, ValT> fun1, WindowingStrategy<?, ?> windowingStrategy, Encoder<BoundedWindow> encoder, Encoder<AccT> encoder2, Encoder<WindowedValue<ResT>> encoder3) {
            super(combineFn, fun1, windowingStrategy, encoder, encoder2, encoder3, Map.class);
        }

        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public Map<BoundedWindow, MutablePair<Instant, AccT>> m68zero() {
            return new HashMap();
        }

        public final Map<BoundedWindow, MutablePair<Instant, AccT>> reduce(Map<BoundedWindow, MutablePair<Instant, AccT>> map, WindowedValue<InT> windowedValue) {
            return reduce(map, windowedValue.getWindows(), value(windowedValue), windowedValue.getTimestamp());
        }

        protected Map<BoundedWindow, MutablePair<Instant, AccT>> reduce(Map<BoundedWindow, MutablePair<Instant, AccT>> map, Collection<BoundedWindow> collection, ValT valt, Instant instant) {
            Iterator<BoundedWindow> it = collection.iterator();
            while (it.hasNext()) {
                map.compute(it.next(), (boundedWindow, mutablePair) -> {
                    return addToAcc(boundedWindow, mutablePair, valt, instant);
                });
            }
            return map;
        }

        @Override // 
        public Map<BoundedWindow, MutablePair<Instant, AccT>> merge(Map<BoundedWindow, MutablePair<Instant, AccT>> map, Map<BoundedWindow, MutablePair<Instant, AccT>> map2) {
            if (map.isEmpty()) {
                return map2;
            }
            if (map2.isEmpty()) {
                return map;
            }
            if (map2.size() > map.size()) {
                return merge((Map) map2, (Map) map);
            }
            map2.forEach((boundedWindow, mutablePair) -> {
                map.merge(boundedWindow, mutablePair, combiner(boundedWindow));
            });
            return map;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$SessionsAggregator.class */
    private static class SessionsAggregator<ValT, AccT, ResT, InT> extends WindowedAggregator<ValT, AccT, ResT, InT, IntervalWindow, TreeMap<IntervalWindow, MutablePair<Instant, AccT>>> {
        SessionsAggregator(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<WindowedValue<InT>, ValT> fun1, WindowingStrategy<?, ?> windowingStrategy, Encoder<IntervalWindow> encoder, Encoder<AccT> encoder2, Encoder<WindowedValue<ResT>> encoder3) {
            super(combineFn, fun1, windowingStrategy, encoder, encoder2, encoder3, TreeMap.class);
            Preconditions.checkArgument(windowingStrategy.getWindowFn().getClass().equals(Sessions.class));
        }

        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public final TreeMap<IntervalWindow, MutablePair<Instant, AccT>> m69zero() {
            return new TreeMap<>();
        }

        public TreeMap<IntervalWindow, MutablePair<Instant, AccT>> reduce(TreeMap<IntervalWindow, MutablePair<Instant, AccT>> treeMap, WindowedValue<InT> windowedValue) {
            for (IntervalWindow intervalWindow : windowedValue.getWindows()) {
                MutablePair<Instant, AccT> mutablePair = null;
                IntervalWindow intervalWindow2 = null;
                IntervalWindow intervalWindow3 = null;
                Map.Entry<IntervalWindow, MutablePair<Instant, AccT>> floorEntry = treeMap.floorEntry(intervalWindow);
                if (floorEntry != null && intervalWindow.intersects(floorEntry.getKey())) {
                    mutablePair = floorEntry.getValue();
                    intervalWindow = intervalWindow.span(floorEntry.getKey());
                    IntervalWindow key = floorEntry.getKey();
                    intervalWindow3 = key;
                    intervalWindow2 = key;
                }
                for (Map.Entry<IntervalWindow, MutablePair<Instant, AccT>> entry : treeMap.tailMap(intervalWindow, false).entrySet()) {
                    MutablePair<Instant, AccT> value = entry.getValue();
                    IntervalWindow key2 = entry.getKey();
                    if (!intervalWindow.intersects(key2)) {
                        break;
                    }
                    intervalWindow = intervalWindow.span(key2);
                    mutablePair = mutablePair == null ? value : mergeAccs(intervalWindow, mutablePair, value);
                    if (intervalWindow2 == null) {
                        intervalWindow3 = key2;
                        intervalWindow2 = key2;
                    } else {
                        intervalWindow3 = key2;
                    }
                }
                if (intervalWindow2 != null && intervalWindow3 != null) {
                    treeMap.navigableKeySet().subSet(intervalWindow2, true, intervalWindow3, true).clear();
                }
                treeMap.put(intervalWindow, addToAcc(intervalWindow, mutablePair, value(windowedValue), windowedValue.getTimestamp()));
            }
            return treeMap;
        }

        public TreeMap<IntervalWindow, MutablePair<Instant, AccT>> merge(TreeMap<IntervalWindow, MutablePair<Instant, AccT>> treeMap, TreeMap<IntervalWindow, MutablePair<Instant, AccT>> treeMap2) {
            if (treeMap.isEmpty()) {
                return treeMap2;
            }
            if (treeMap2.isEmpty()) {
                return treeMap;
            }
            TreeMap<IntervalWindow, MutablePair<Instant, AccT>> m69zero = m69zero();
            PeekingIterator peekingIterator = Iterators.peekingIterator(treeMap.entrySet().iterator());
            PeekingIterator peekingIterator2 = Iterators.peekingIterator(treeMap2.entrySet().iterator());
            MutablePair<Instant, AccT> mutablePair = null;
            IntervalWindow intervalWindow = null;
            while (true) {
                if (!peekingIterator.hasNext() && !peekingIterator2.hasNext()) {
                    break;
                }
                Map.Entry entry = (peekingIterator.hasNext() && peekingIterator2.hasNext()) ? ((IntervalWindow) ((Map.Entry) peekingIterator.peek()).getKey()).compareTo((IntervalWindow) ((Map.Entry) peekingIterator2.peek()).getKey()) <= 0 ? (Map.Entry) peekingIterator.next() : (Map.Entry) peekingIterator2.next() : peekingIterator.hasNext() ? (Map.Entry) peekingIterator.next() : (Map.Entry) peekingIterator2.next();
                if (intervalWindow == null || !intervalWindow.intersects((IntervalWindow) entry.getKey())) {
                    if (intervalWindow != null && mutablePair != null) {
                        m69zero.put(intervalWindow, mutablePair);
                    }
                    mutablePair = (MutablePair) entry.getValue();
                    intervalWindow = (IntervalWindow) entry.getKey();
                } else {
                    intervalWindow = intervalWindow.span((IntervalWindow) entry.getKey());
                    mutablePair = mergeAccs(intervalWindow, mutablePair, (MutablePair) entry.getValue());
                }
            }
            if (intervalWindow != null && mutablePair != null) {
                m69zero.put(intervalWindow, mutablePair);
            }
            return m69zero;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$ValueAggregator.class */
    private static class ValueAggregator<ValT, AccT, ResT, InT> extends CombineFnAggregator<ValT, AccT, ResT, InT, AccT, ResT> {
        public ValueAggregator(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<InT, ValT> fun1, Encoder<AccT> encoder, Encoder<ResT> encoder2) {
            super(combineFn, fun1, encoder, encoder2);
        }

        public AccT zero() {
            return emptyAcc();
        }

        public AccT reduce(AccT acct, InT r7) {
            return addToAcc(acct, value(r7));
        }

        public AccT merge(AccT acct, AccT acct2) {
            return mergeAccs(acct, acct2);
        }

        public ResT finish(AccT acct) {
            return extract(acct);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/Aggregators$WindowedAggregator.class */
    public static abstract class WindowedAggregator<ValT, AccT, ResT, InT, W extends BoundedWindow, MapT extends Map<W, MutablePair<Instant, AccT>>> extends CombineFnAggregator<ValT, AccT, ResT, WindowedValue<InT>, MapT, Collection<WindowedValue<ResT>>> {
        private final TimestampCombiner tsCombiner;

        public WindowedAggregator(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<WindowedValue<InT>, ValT> fun1, WindowingStrategy<?, ?> windowingStrategy, Encoder<W> encoder, Encoder<AccT> encoder2, Encoder<WindowedValue<ResT>> encoder3, Class<MapT> cls) {
            super(combineFn, fun1, EncoderHelpers.mapEncoder(encoder, EncoderHelpers.mutablePairEncoder(EncoderHelpers.encoderOf(Instant.class), encoder2), cls), EncoderHelpers.collectionEncoder(encoder3));
            this.tsCombiner = windowingStrategy.getTimestampCombiner();
        }

        protected final Instant resolveTimestamp(BoundedWindow boundedWindow, Instant instant, Instant instant2) {
            return this.tsCombiner.merge(boundedWindow, new Instant[]{instant, instant2});
        }

        protected final MutablePair<Instant, AccT> initAcc(ValT valt, Instant instant) {
            return new MutablePair<>(instant, addToAcc(emptyAcc(), valt));
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected final <T extends MutablePair<Instant, AccT>> T mergeAccs(W w, T t, T t2) {
            return (t == null || t2 == null) ? t == null ? t2 : t : (T) t.update(resolveTimestamp(w, (Instant) ((MutablePair) t)._1, (Instant) ((MutablePair) t2)._1), mergeAccs(((MutablePair) t)._2, ((MutablePair) t2)._2));
        }

        protected BinaryOperator<MutablePair<Instant, AccT>> combiner(W w) {
            return (mutablePair, mutablePair2) -> {
                return mergeAccs(w, mutablePair, mutablePair2);
            };
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected final MutablePair<Instant, AccT> addToAcc(W w, MutablePair<Instant, AccT> mutablePair, ValT valt, Instant instant) {
            return mutablePair == null ? initAcc(valt, instant) : mutablePair.update(resolveTimestamp(w, (Instant) mutablePair._1, instant), addToAcc(mutablePair._2, valt));
        }

        public final Collection<WindowedValue<ResT>> finish(MapT mapt) {
            return Collections2.transform(mapt.entrySet(), this::windowedValue);
        }

        /* JADX WARN: Multi-variable type inference failed */
        private WindowedValue<ResT> windowedValue(Map.Entry<W, MutablePair<Instant, AccT>> entry) {
            return WindowedValue.of(extract(entry.getValue()._2), (Instant) entry.getValue()._1, entry.getKey(), PaneInfo.NO_FIRING);
        }
    }

    Aggregators() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <ValT, AccT, ResT, InT> Aggregator<InT, ?, ResT> value(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<InT, ValT> fun1, Encoder<AccT> encoder, Encoder<ResT> encoder2) {
        return new ValueAggregator(combineFn, fun1, encoder, encoder2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <ValT, AccT, ResT, InT> Aggregator<WindowedValue<InT>, ?, Collection<WindowedValue<ResT>>> windowedValue(Combine.CombineFn<ValT, AccT, ResT> combineFn, ScalaInterop.Fun1<WindowedValue<InT>, ValT> fun1, WindowingStrategy<?, ?> windowingStrategy, Encoder<BoundedWindow> encoder, Encoder<AccT> encoder2, Encoder<WindowedValue<ResT>> encoder3) {
        return !windowingStrategy.needsMerge() ? new NonMergingWindowedAggregator(combineFn, fun1, windowingStrategy, encoder, encoder2, encoder3) : windowingStrategy.getWindowFn().getClass().equals(Sessions.class) ? new SessionsAggregator(combineFn, fun1, windowingStrategy, encoder, encoder2, encoder3) : new MergingWindowedAggregator(combineFn, fun1, windowingStrategy, encoder, encoder2, encoder3);
    }
}
