package org.apache.beam.runners.spark.translation;

import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.Objects;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.PeekingIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.checkerframework.dataflow.qual.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.class */
public class GroupNonMergingWindowsFunctions {
    private static final Logger LOG = LoggerFactory.getLogger(GroupNonMergingWindowsFunctions.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions$GroupByKeyIterator.class */
    public static class GroupByKeyIterator<K, V, W extends BoundedWindow> implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
        private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
        private final Coder<K> keyCoder;
        private final WindowingStrategy<?, W> windowingStrategy;
        private final WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
        private boolean hasNext = true;
        private ByteArray currentKey = null;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.class */
        public class ValueIterator implements Iterable<V> {
            boolean consumed = false;
            private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
            private final ByteArray currentKey;

            ValueIterator(PeekingIterator<Tuple2<ByteArray, byte[]>> peekingIterator, ByteArray byteArray) {
                this.inner = peekingIterator;
                this.currentKey = byteArray;
            }

            @Override // java.lang.Iterable
            public Iterator<V> iterator() {
                if (this.consumed) {
                    throw new IllegalStateException("ValueIterator can't be iterated more than once,otherwise there could be data lost");
                }
                this.consumed = true;
                return new AbstractIterator<V>() { // from class: org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.GroupByKeyIterator.ValueIterator.1
                    protected V computeNext() {
                        return (ValueIterator.this.inner.hasNext() && ValueIterator.this.currentKey.equals(((Tuple2) ValueIterator.this.inner.peek())._1)) ? (V) GroupByKeyIterator.this.decodeValue((byte[]) ((Tuple2) ValueIterator.this.inner.next())._2) : (V) endOfData();
                    }
                };
            }
        }

        GroupByKeyIterator(Iterator<Tuple2<ByteArray, byte[]>> it, Coder<K> coder, WindowingStrategy<?, W> windowingStrategy, WindowedValue.FullWindowedValueCoder<KV<K, V>> fullWindowedValueCoder) throws Coder.NonDeterministicException {
            this.inner = Iterators.peekingIterator(it);
            this.keyCoder = coder;
            this.windowingStrategy = windowingStrategy;
            this.windowedValueCoder = fullWindowedValueCoder;
        }

        @Override // java.util.Iterator
        @Pure
        public boolean hasNext() {
            return this.hasNext;
        }

        @Override // java.util.Iterator
        public WindowedValue<KV<K, Iterable<V>>> next() {
            while (this.inner.hasNext()) {
                ByteArray byteArray = (ByteArray) ((Tuple2) this.inner.peek())._1;
                if (!byteArray.equals(this.currentKey)) {
                    this.currentKey = byteArray;
                    WindowedValue<KV<K, V>> decodeItem = decodeItem((Tuple2) this.inner.peek());
                    return decodeItem.withValue(KV.of(((KV) decodeItem.getValue()).getKey(), new ValueIterator(this.inner, this.currentKey)));
                }
                this.inner.next();
            }
            this.hasNext = false;
            return null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public V decodeValue(byte[] bArr) {
            return (V) ((KV) ((WindowedValue) CoderHelpers.fromByteArray(bArr, this.windowedValueCoder)).getValue()).getValue();
        }

        private WindowedValue<KV<K, V>> decodeItem(Tuple2<ByteArray, byte[]> tuple2) {
            Object fromByteArray = CoderHelpers.fromByteArray(((ByteArray) tuple2._1).getValue(), this.keyCoder);
            WindowedValue windowedValue = (WindowedValue) CoderHelpers.fromByteArray((byte[]) tuple2._2, this.windowedValueCoder);
            Object value = ((KV) windowedValue.getValue()).getValue();
            BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
            return WindowedValue.of(KV.of(fromByteArray, value), this.windowingStrategy.getTimestampCombiner().assign(boundedWindow, this.windowingStrategy.getWindowFn().getOutputTime(windowedValue.getTimestamp(), boundedWindow)), boundedWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isEligibleForGroupByWindow(WindowingStrategy<?, ?> windowingStrategy) {
        return windowingStrategy.getWindowFn().isNonMerging() && windowingStrategy.getTimestampCombiner() == TimestampCombiner.END_OF_WINDOW && windowingStrategy.getWindowFn().windowCoder().consistentWithEquals();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyAndWindow(JavaRDD<WindowedValue<KV<K, V>>> javaRDD, Coder<K> coder, Coder<V> coder2, WindowingStrategy<?, W> windowingStrategy, Partitioner partitioner) {
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(KvCoder.of(coder, coder2), windowCoder);
        return bringWindowToKey(javaRDD, coder, windowCoder, windowedValue -> {
            return CoderHelpers.toByteArray(windowedValue, of);
        }).repartitionAndSortWithinPartitions(getPartitioner(partitioner, javaRDD)).mapPartitions(it -> {
            return new GroupByKeyIterator(it, coder, windowingStrategy, of);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V, W extends BoundedWindow> JavaPairRDD<ByteArray, WindowedValue<KV<K, V>>> bringWindowToKey(JavaRDD<WindowedValue<KV<K, V>>> javaRDD, Coder<K> coder, Coder<W> coder2) {
        return bringWindowToKey(javaRDD, coder, coder2, windowedValue -> {
            return windowedValue;
        });
    }

    static <K, V, OutputT, W extends BoundedWindow> JavaPairRDD<ByteArray, OutputT> bringWindowToKey(JavaRDD<WindowedValue<KV<K, V>>> javaRDD, Coder<K> coder, Coder<W> coder2, SerializableFunction<WindowedValue<KV<K, V>>, OutputT> serializableFunction) {
        if (!isKeyAndWindowCoderConsistentWithEquals(coder, coder2)) {
            LOG.warn("Either coder {} or {} is not consistent with equals. That might cause issues on some runners.", coder, coder2);
        }
        return javaRDD.flatMapToPair(windowedValue -> {
            byte[] byteArray = CoderHelpers.toByteArray(((KV) windowedValue.getValue()).getKey(), coder);
            return Iterators.transform(windowedValue.explodeWindows().iterator(), windowedValue -> {
                Objects.requireNonNull(windowedValue, "Exploded window can not be null.");
                BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
                return new Tuple2(new ByteArray(Bytes.concat((byte[][]) new byte[]{byteArray, CoderHelpers.toByteArray(boundedWindow, coder2)})), serializableFunction.apply(WindowedValue.of((KV) windowedValue.getValue(), windowedValue.getTimestamp(), boundedWindow, windowedValue.getPane())));
            });
        });
    }

    private static boolean isKeyAndWindowCoderConsistentWithEquals(Coder<?> coder, Coder<?> coder2) {
        try {
            coder.verifyDeterministic();
            coder2.verifyDeterministic();
            if (coder.consistentWithEquals()) {
                if (coder2.consistentWithEquals()) {
                    return true;
                }
            }
            return false;
        } catch (Coder.NonDeterministicException e) {
            throw new IllegalArgumentException("Coder for both key " + coder + " and " + coder2 + " must be deterministic", e);
        }
    }

    private static <K, V> Partitioner getPartitioner(Partitioner partitioner, JavaRDD<WindowedValue<KV<K, V>>> javaRDD) {
        return partitioner == null ? new HashPartitioner(javaRDD.getNumPartitions()) : partitioner;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1450204607:
                if (implMethodName.equals("lambda$groupByKeyAndWindow$358b7749$1")) {
                    z = 3;
                    break;
                }
                break;
            case 178643682:
                if (implMethodName.equals("lambda$bringWindowToKey$cce0afdd$1")) {
                    z = false;
                    break;
                }
                break;
            case 200450650:
                if (implMethodName.equals("lambda$groupByKeyAndWindow$c9b6f5c4$1")) {
                    z = true;
                    break;
                }
                break;
            case 1296528558:
                if (implMethodName.equals("lambda$bringWindowToKey$d155732e$1")) {
                    z = 2;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/util/WindowedValue;)Lorg/apache/beam/sdk/util/WindowedValue;")) {
                    return windowedValue -> {
                        return windowedValue;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder;Lorg/apache/beam/sdk/util/WindowedValue;)[B")) {
                    WindowedValue.FullWindowedValueCoder fullWindowedValueCoder = (WindowedValue.FullWindowedValueCoder) serializedLambda.getCapturedArg(0);
                    return windowedValue2 -> {
                        return CoderHelpers.toByteArray(windowedValue2, fullWindowedValueCoder);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/transforms/SerializableFunction;Lorg/apache/beam/sdk/util/WindowedValue;)Ljava/util/Iterator;")) {
                    Coder coder = (Coder) serializedLambda.getCapturedArg(0);
                    Coder coder2 = (Coder) serializedLambda.getCapturedArg(1);
                    SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(2);
                    return windowedValue3 -> {
                        byte[] byteArray = CoderHelpers.toByteArray(((KV) windowedValue3.getValue()).getKey(), coder);
                        return Iterators.transform(windowedValue3.explodeWindows().iterator(), windowedValue3 -> {
                            Objects.requireNonNull(windowedValue3, "Exploded window can not be null.");
                            BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue3.getWindows());
                            return new Tuple2(new ByteArray(Bytes.concat((byte[][]) new byte[]{byteArray, CoderHelpers.toByteArray(boundedWindow, coder2)})), serializableFunction.apply(WindowedValue.of((KV) windowedValue3.getValue(), windowedValue3.getTimestamp(), boundedWindow, windowedValue3.getPane())));
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/values/WindowingStrategy;Lorg/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    Coder coder3 = (Coder) serializedLambda.getCapturedArg(0);
                    WindowingStrategy windowingStrategy = (WindowingStrategy) serializedLambda.getCapturedArg(1);
                    WindowedValue.FullWindowedValueCoder fullWindowedValueCoder2 = (WindowedValue.FullWindowedValueCoder) serializedLambda.getCapturedArg(2);
                    return it -> {
                        return new GroupByKeyIterator(it, coder3, windowingStrategy, fullWindowedValueCoder2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
