package org.apache.beam.runners.spark.translation;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.translation.SparkCombineFn;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/TransformTranslator.class */
public final class TransformTranslator {
    private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX INFO: Add missing generic type declarations: [V, K] */
    /* renamed from: org.apache.beam.runners.spark.translation.TransformTranslator$9, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/TransformTranslator$9.class */
    public class AnonymousClass9<K, V> extends AbstractIterator<Iterator<WindowedValue<KV<K, V>>>> {
        Tuple2<ByteArray, byte[]> read = null;
        final /* synthetic */ Coder val$keyCoder;
        final /* synthetic */ Iterator val$in;
        final /* synthetic */ Coder val$wvCoder;

        AnonymousClass9(Coder coder, Iterator it, Coder coder2) {
            this.val$keyCoder = coder;
            this.val$in = it;
            this.val$wvCoder = coder2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public Iterator<WindowedValue<KV<K, V>>> m89computeNext() {
            readNext();
            if (this.read == null) {
                return (Iterator) endOfData();
            }
            byte[] value = ((ByteArray) this.read._1()).getValue();
            byte[] copyOfRange = Arrays.copyOfRange(value, 0, value.length - 8);
            return createIteratorForKey(copyOfRange, CoderHelpers.fromByteArray(copyOfRange, this.val$keyCoder));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void readNext() {
            if (this.read == null && this.val$in.hasNext()) {
                this.read = (Tuple2) this.val$in.next();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void consumed() {
            this.read = null;
        }

        private Iterator<WindowedValue<KV<K, V>>> createIteratorForKey(final byte[] bArr, final K k) {
            return new AbstractIterator<WindowedValue<KV<K, V>>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.9.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
                public WindowedValue<KV<K, V>> m90computeNext() {
                    AnonymousClass9.this.readNext();
                    if (AnonymousClass9.this.read != null) {
                        byte[] value = ((ByteArray) AnonymousClass9.this.read._1()).getValue();
                        if (Arrays.equals(Arrays.copyOfRange(value, 0, value.length - 8), bArr)) {
                            WindowedValue windowedValue = (WindowedValue) CoderHelpers.fromByteArray((byte[]) AnonymousClass9.this.read._2(), AnonymousClass9.this.val$wvCoder);
                            AnonymousClass9.this.consumed();
                            return WindowedValue.of(KV.of(k, windowedValue.getValue()), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane());
                        }
                    }
                    return (WindowedValue) endOfData();
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/TransformTranslator$Translator.class */
    public static class Translator implements SparkPipelineTranslator {
        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public boolean hasTranslation(PTransform<?, ?> pTransform) {
            return TransformTranslator.EVALUATORS.containsKey(PTransformTranslation.urnForTransformOrNull(pTransform));
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(PTransform<?, ?> pTransform) {
            TransformEvaluator<TransformT> translator = TransformTranslator.getTranslator(pTransform);
            Preconditions.checkState(translator != null, "No TransformEvaluator registered for BOUNDED transform %s", pTransform);
            return translator;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(PTransform<?, ?> pTransform) {
            throw new IllegalStateException("TransformTranslator used in a batch pipeline only supports BOUNDED transforms.");
        }
    }

    private TransformTranslator() {
    }

    private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.PCollections<T>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.1
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Flatten.PCollections<T> pCollections, EvaluationContext evaluationContext) {
                JavaRDD union;
                Collection<PCollection<?>> values = evaluationContext.getInputs(pCollections).values();
                if (values.isEmpty()) {
                    union = evaluationContext.getSparkContext().emptyRDD();
                } else {
                    JavaRDD[] javaRDDArr = new JavaRDD[values.size()];
                    int i = 0;
                    Iterator<PCollection<?>> it = values.iterator();
                    while (it.hasNext()) {
                        javaRDDArr[i] = ((BoundedDataset) evaluationContext.borrowDataset((PValue) it.next())).getRDD();
                        i++;
                    }
                    union = evaluationContext.getSparkContext().union(javaRDDArr);
                }
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) pCollections, (Dataset) new BoundedDataset(union));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "sparkContext.union(...)";
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.2
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(GroupByKey<K, V> groupByKey, EvaluationContext evaluationContext) {
                JavaRDD rdd = ((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) groupByKey)).getRDD();
                KvCoder coder = evaluationContext.getInput(groupByKey).getCoder();
                WindowingStrategy windowingStrategy = evaluationContext.getInput(groupByKey).getWindowingStrategy();
                WindowFn windowFn = windowingStrategy.getWindowFn();
                Coder keyCoder = coder.getKeyCoder();
                WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
                Partitioner partitioner = TransformTranslator.getPartitioner(evaluationContext);
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) groupByKey, (Dataset) new BoundedDataset(GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy) ? GroupNonMergingWindowsFunctions.groupByKeyAndWindow(rdd, keyCoder, coder.getValueCoder(), windowingStrategy, partitioner) : GroupCombineFunctions.groupByKeyOnly(rdd, keyCoder, of, partitioner).flatMap(new SparkGroupAlsoByWindowViaOutputBufferFn(windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory(), SystemReduceFn.buffering(coder.getValueCoder()), evaluationContext.getSerializableOptions()))));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "groupByKey()";
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<KV<K, InputT>, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<KV<K, InputT>, InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.3
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Combine.GroupedValues<KV<K, InputT>, InputT, OutputT> groupedValues, EvaluationContext evaluationContext) {
                CombineWithContext.CombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(groupedValues.getFn());
                SparkCombineFn keyed = SparkCombineFn.keyed(fnWithContext, evaluationContext.getSerializableOptions(), TranslationUtils.getSideInputs(groupedValues.getSideInputs(), evaluationContext), evaluationContext.getInput(groupedValues).getWindowingStrategy());
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) groupedValues, (Dataset) new BoundedDataset(((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) groupedValues)).getRDD().map(windowedValue -> {
                    return WindowedValue.of(KV.of(((KV) windowedValue.getValue()).getKey(), fnWithContext.apply((Iterable) ((KV) windowedValue.getValue()).getValue(), keyed.ctxtForValue(windowedValue))), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane());
                })));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "map(new <fn>())";
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 1450499146:
                        if (implMethodName.equals("lambda$evaluate$1bd0dc1d$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/TransformTranslator$3") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/CombineWithContext$CombineFnWithContext;Lorg/apache/beam/runners/spark/translation/SparkCombineFn;Lorg/apache/beam/sdk/util/WindowedValue;)Lorg/apache/beam/sdk/util/WindowedValue;")) {
                            CombineWithContext.CombineFnWithContext combineFnWithContext = (CombineWithContext.CombineFnWithContext) serializedLambda.getCapturedArg(0);
                            SparkCombineFn sparkCombineFn = (SparkCombineFn) serializedLambda.getCapturedArg(1);
                            return windowedValue -> {
                                return WindowedValue.of(KV.of(((KV) windowedValue.getValue()).getKey(), combineFnWithContext.apply((Iterable) ((KV) windowedValue.getValue()).getValue(), sparkCombineFn.ctxtForValue(windowedValue))), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane());
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>> combineGlobally() {
        return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.4
            /* JADX WARN: Type inference failed for: r1v16, types: [byte[], java.lang.Object[]] */
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Combine.Globally<InputT, OutputT> globally, EvaluationContext evaluationContext) {
                JavaRDD map;
                PCollection input = evaluationContext.getInput(globally);
                Coder coder = evaluationContext.getInput(globally).getCoder();
                Coder coder2 = evaluationContext.getOutput(globally).getCoder();
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                CombineWithContext.CombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(globally.getFn());
                WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(coder2, windowingStrategy.getWindowFn().windowCoder());
                boolean isInsertDefault = globally.isInsertDefault();
                SparkCombineFn globally2 = SparkCombineFn.globally(fnWithContext, evaluationContext.getSerializableOptions(), TranslationUtils.getSideInputs(globally.getSideInputs(), evaluationContext), windowingStrategy);
                try {
                    Coder accumulatorCoder = fnWithContext.getAccumulatorCoder(evaluationContext.getPipeline().getCoderRegistry(), coder);
                    JavaRDD rdd = ((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) globally)).getRDD();
                    SparkCombineFn.WindowedAccumulator combineGlobally = GroupCombineFunctions.combineGlobally(rdd, globally2, accumulatorCoder, windowingStrategy);
                    if (combineGlobally.isEmpty()) {
                        JavaSparkContext javaSparkContext = new JavaSparkContext(rdd.context());
                        map = isInsertDefault ? javaSparkContext.parallelize(Lists.newArrayList((Object[]) new byte[]{CoderHelpers.toByteArray(fnWithContext.defaultValue(), coder2)})).map(CoderHelpers.fromByteFunction(coder2)).map(WindowedValue::valueInGlobalWindow) : javaSparkContext.emptyRDD();
                    } else {
                        map = evaluationContext.getSparkContext().parallelize(CoderHelpers.toByteArrays(globally2.extractOutput(combineGlobally), of)).map(CoderHelpers.fromByteFunction(of));
                    }
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) globally, (Dataset) new BoundedDataset(map));
                } catch (CannotProvideCoderException e) {
                    throw new IllegalStateException("Could not determine coder for accumulator", e);
                }
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "aggregate(..., new <fn>(), ...)";
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1477534967:
                        if (implMethodName.equals("valueInGlobalWindow")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/util/WindowedValue") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lorg/apache/beam/sdk/util/WindowedValue;")) {
                            return WindowedValue::valueInGlobalWindow;
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
        return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.5
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Combine.PerKey<K, InputT, OutputT> perKey, EvaluationContext evaluationContext) {
                PCollection input = evaluationContext.getInput(perKey);
                KvCoder coder = evaluationContext.getInput(perKey).getCoder();
                CombineWithContext.CombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(perKey.getFn());
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                SparkCombineFn keyed = SparkCombineFn.keyed(fnWithContext, evaluationContext.getSerializableOptions(), TranslationUtils.getSideInputs(perKey.getSideInputs(), evaluationContext), windowingStrategy);
                try {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) perKey, (Dataset) new BoundedDataset(SparkCompat.extractOutput(GroupCombineFunctions.combinePerKey(((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) perKey)).getRDD(), keyed, coder.getKeyCoder(), coder.getValueCoder(), fnWithContext.getAccumulatorCoder(evaluationContext.getPipeline().getCoderRegistry(), coder.getValueCoder()), windowingStrategy), keyed).map(new TranslationUtils.FromPairFunction()).map(new TranslationUtils.ToKVByWindowInValueFunction())));
                } catch (CannotProvideCoderException e) {
                    throw new IllegalStateException("Could not determine coder for accumulator", e);
                }
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "combineByKey(..., new <fn>(), ...)";
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.6
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(ParDo.MultiOutput<InputT, OutputT> multiOutput, EvaluationContext evaluationContext) {
                String fullName = evaluationContext.getCurrentTransform().getFullName();
                DoFn fn = multiOutput.getFn();
                Preconditions.checkState(!DoFnSignatures.signatureForDoFn(fn).processElement().isSplittable(), "Not expected to directly translate splittable DoFn, should have been overridden: %s", fn);
                JavaRDD rdd = ((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) multiOutput)).getRDD();
                WindowingStrategy windowingStrategy = evaluationContext.getInput(multiOutput).getWindowingStrategy();
                MetricsContainerStepMapAccumulator metricsAccumulator = MetricsAccumulator.getInstance();
                Coder coder = evaluationContext.getInput(multiOutput).getCoder();
                Map<TupleTag<?>, Coder<?>> outputCoders = evaluationContext.getOutputCoders();
                DoFnSignature signature = DoFnSignatures.getSignature(multiOutput.getFn().getClass());
                boolean z = signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0;
                MultiDoFnFunction multiDoFnFunction = new MultiDoFnFunction(metricsAccumulator, fullName, fn, evaluationContext.getSerializableOptions(), multiOutput.getMainOutputTag(), multiOutput.getAdditionalOutputTags().getAll(), coder, outputCoders, TranslationUtils.getSideInputs(multiOutput.getSideInputs().values(), evaluationContext), windowingStrategy, z, ParDoTranslation.getSchemaInformation(evaluationContext.getCurrentTransform()), ParDoTranslation.getSideInputMapping(evaluationContext.getCurrentTransform()));
                JavaPairRDD statefulParDoTransform = z ? TransformTranslator.statefulParDoTransform(evaluationContext.getInput(multiOutput).getCoder(), windowingStrategy.getWindowFn().windowCoder(), rdd, TransformTranslator.getPartitioner(evaluationContext), multiDoFnFunction, signature.processElement().requiresTimeSortedInput()) : rdd.mapPartitionsToPair(multiDoFnFunction);
                Map<TupleTag<?>, PCollection<?>> outputs = evaluationContext.getOutputs(multiOutput);
                if (outputs.size() > 1) {
                    StorageLevel fromString = StorageLevel.fromString(evaluationContext.storageLevel());
                    if (TranslationUtils.canAvoidRddSerialization(fromString)) {
                        statefulParDoTransform = statefulParDoTransform.persist(fromString);
                    } else {
                        Map<TupleTag<?>, Coder<WindowedValue<?>>> tupleTagCoders = TranslationUtils.getTupleTagCoders(outputs);
                        statefulParDoTransform = statefulParDoTransform.mapToPair(TranslationUtils.getTupleTagEncodeFunction(tupleTagCoders)).persist(fromString).mapToPair(TranslationUtils.getTupleTagDecodeFunction(tupleTagCoders));
                    }
                }
                for (Map.Entry<TupleTag<?>, PCollection<?>> entry : outputs.entrySet()) {
                    evaluationContext.putDataset((PValue) entry.getValue(), new BoundedDataset(statefulParDoTransform.filter(new TranslationUtils.TupleTagFilter(entry.getKey())).values()));
                }
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "mapPartitions(new <fn>())";
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V, OutputT> JavaPairRDD<TupleTag<?>, WindowedValue<?>> statefulParDoTransform(KvCoder<K, V> kvCoder, Coder<? extends BoundedWindow> coder, JavaRDD<WindowedValue<KV<K, V>>> javaRDD, Partitioner partitioner, MultiDoFnFunction<KV<K, V>, OutputT> multiDoFnFunction, boolean z) {
        Coder keyCoder = kvCoder.getKeyCoder();
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(), coder);
        return !z ? GroupCombineFunctions.groupByKeyOnly(javaRDD, keyCoder, of, partitioner).map(kv -> {
            Object key = kv.getKey();
            return FluentIterable.from((Iterable) kv.getValue()).transform(windowedValue -> {
                return windowedValue.withValue(KV.of(key, windowedValue.getValue()));
            }).iterator();
        }).flatMapToPair(multiDoFnFunction) : javaRDD.map(new ReifyTimestampsAndWindowsFunction()).mapToPair(TranslationUtils.toPairFunction()).mapToPair(CoderHelpers.toByteFunctionWithTs(keyCoder, of, tuple2 -> {
            return ((WindowedValue) tuple2._2()).getTimestamp();
        })).repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)).mapPartitionsToPair(wrapDoFnFromSortedRDD(multiDoFnFunction, keyCoder, of));
    }

    private static Partitioner keyPrefixPartitionerFrom(final Partitioner partitioner) {
        return new Partitioner() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.7
            public int numPartitions() {
                return partitioner.numPartitions();
            }

            public int getPartition(Object obj) {
                ByteArray byteArray = (ByteArray) obj;
                return partitioner.getPartition(new ByteArray(Arrays.copyOfRange(byteArray.getValue(), 0, byteArray.getValue().length - 8)));
            }
        };
    }

    private static <K, V, OutputT> PairFlatMapFunction<Iterator<Tuple2<ByteArray, byte[]>>, TupleTag<?>, WindowedValue<?>> wrapDoFnFromSortedRDD(MultiDoFnFunction<KV<K, V>, OutputT> multiDoFnFunction, Coder<K> coder, Coder<WindowedValue<V>> coder2) {
        return it -> {
            return flatten(Iterators.transform(splitBySameKey(it, coder, coder2), it -> {
                try {
                    return multiDoFnFunction.call(it);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }));
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static <T> Iterator<T> flatten(final Iterator<Iterator<T>> it) {
        return new AbstractIterator<T>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.8
            Iterator<T> current = null;

            protected T computeNext() {
                while (true) {
                    if (this.current == null) {
                        if (!it.hasNext()) {
                            return (T) endOfData();
                        }
                        this.current = (Iterator) it.next();
                    }
                    if (this.current.hasNext()) {
                        return this.current.next();
                    }
                    this.current = null;
                }
            }
        };
    }

    @VisibleForTesting
    static <K, V> Iterator<Iterator<WindowedValue<KV<K, V>>>> splitBySameKey(Iterator<Tuple2<ByteArray, byte[]>> it, Coder<K> coder, Coder<WindowedValue<V>> coder2) {
        return (Iterator<Iterator<WindowedValue<KV<K, V>>>>) new AnonymousClass9(coder, it, coder2);
    }

    private static TransformEvaluator<Impulse> impulse() {
        return new TransformEvaluator<Impulse>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.10
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Impulse impulse, EvaluationContext evaluationContext) {
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) impulse, new BoundedDataset(Collections.singletonList(new byte[0]), evaluationContext.getSparkContext(), ByteArrayCoder.of()));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "sparkContext.<impulse>()";
            }
        };
    }

    private static <T> TransformEvaluator<SplittableParDo.PrimitiveBoundedRead<T>> readBounded() {
        return new TransformEvaluator<SplittableParDo.PrimitiveBoundedRead<T>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.11
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(SplittableParDo.PrimitiveBoundedRead<T> primitiveBoundedRead, EvaluationContext evaluationContext) {
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) primitiveBoundedRead, (Dataset) new BoundedDataset(new SourceRDD.Bounded(evaluationContext.getSparkContext().sc(), primitiveBoundedRead.getSource(), evaluationContext.getSerializableOptions(), evaluationContext.getCurrentTransform().getFullName()).toJavaRDD()));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "sparkContext.<readFrom(<source>)>()";
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
        return new TransformEvaluator<Window.Assign<T>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.12
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Window.Assign<T> assign, EvaluationContext evaluationContext) {
                JavaRDD<WindowedValue<T>> rdd = ((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) assign)).getRDD();
                if (TranslationUtils.skipAssignWindows(assign, evaluationContext)) {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) assign, (Dataset) new BoundedDataset(rdd));
                } else {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) assign, (Dataset) new BoundedDataset(rdd.map(new SparkAssignWindowFn(assign.getWindowFn()))));
                }
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "map(new <windowFn>())";
            }
        };
    }

    private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() {
        return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.13
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(View.CreatePCollectionView<ReadT, WriteT> createPCollectionView, EvaluationContext evaluationContext) {
                Iterable<WindowedValue<?>> windowedValues = evaluationContext.getWindowedValues(evaluationContext.getInput(createPCollectionView));
                PCollectionView<?> view = createPCollectionView.getView();
                evaluationContext.putPView(view, windowedValues, IterableCoder.of(WindowedValue.getFullCoder(view.getCoderInternal(), view.getWindowingStrategyInternal().getWindowFn().windowCoder())));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "<createPCollectionView>";
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {
        return new TransformEvaluator<Reshuffle<K, V>>() { // from class: org.apache.beam.runners.spark.translation.TransformTranslator.14
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Reshuffle<K, V> reshuffle, EvaluationContext evaluationContext) {
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) reshuffle, (Dataset) new BoundedDataset(GroupCombineFunctions.reshuffle(((BoundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) reshuffle)).getRDD(), WindowedValue.FullWindowedValueCoder.of(evaluationContext.getInput(reshuffle).getCoder(), evaluationContext.getInput(reshuffle).getWindowingStrategy().getWindowFn().windowCoder()))));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "repartition(...)";
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Partitioner getPartitioner(EvaluationContext evaluationContext) {
        if (((SparkPipelineOptions) evaluationContext.getSerializableOptions().get().as(SparkPipelineOptions.class)).getBundleSize().longValue() > 0) {
            return null;
        }
        return new HashPartitioner(evaluationContext.getSparkContext().defaultParallelism().intValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static TransformEvaluator<?> getTranslator(PTransform<?, ?> pTransform) {
        String urnForTransformOrNull = PTransformTranslation.urnForTransformOrNull(pTransform);
        if (urnForTransformOrNull == null) {
            return null;
        }
        return EVALUATORS.get(urnForTransformOrNull);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1588732523:
                if (implMethodName.equals("lambda$statefulParDoTransform$aeb2bb81$1")) {
                    z = true;
                    break;
                }
                break;
            case 739458640:
                if (implMethodName.equals("lambda$wrapDoFnFromSortedRDD$1bf6caa2$1")) {
                    z = false;
                    break;
                }
                break;
            case 1255962101:
                if (implMethodName.equals("lambda$statefulParDoTransform$c79d5e86$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/TransformTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/runners/spark/translation/MultiDoFnFunction;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    Coder coder = (Coder) serializedLambda.getCapturedArg(0);
                    Coder coder2 = (Coder) serializedLambda.getCapturedArg(1);
                    MultiDoFnFunction multiDoFnFunction = (MultiDoFnFunction) serializedLambda.getCapturedArg(2);
                    return it -> {
                        return flatten(Iterators.transform(splitBySameKey(it, coder, coder2), it -> {
                            try {
                                return multiDoFnFunction.call(it);
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/TransformTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;)Ljava/util/Iterator;")) {
                    return kv -> {
                        Object key = kv.getKey();
                        return FluentIterable.from((Iterable) kv.getValue()).transform(windowedValue -> {
                            return windowedValue.withValue(KV.of(key, windowedValue.getValue()));
                        }).iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/TransformTranslator") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lorg/joda/time/Instant;")) {
                    return tuple2 -> {
                        return ((WindowedValue) tuple2._2()).getTimestamp();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        EVALUATORS.put("beam:transform:impulse:v1", impulse());
        EVALUATORS.put("beam:transform:read:v1", readBounded());
        EVALUATORS.put("beam:transform:pardo:v1", parDo());
        EVALUATORS.put("beam:transform:group_by_key:v1", groupByKey());
        EVALUATORS.put("beam:transform:combine_grouped_values:v1", combineGrouped());
        EVALUATORS.put("beam:transform:combine_globally:v1", combineGlobally());
        EVALUATORS.put("beam:transform:combine_per_key:v1", combinePerKey());
        EVALUATORS.put("beam:transform:flatten:v1", flattenPColl());
        EVALUATORS.put("beam:transform:create_view:v1", createPCollView());
        EVALUATORS.put("beam:transform:window_into:v1", window());
        EVALUATORS.put("beam:transform:reshuffle:v1", reshuffle());
    }
}
