package org.apache.beam.runners.spark.translation;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.class */
public class SparkStreamingPortablePipelineTranslator implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
    private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator$PTransformTranslator.class */
    public interface PTransformTranslator {
        void translate(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext);
    }

    @Override // org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator
    public Set<String> knownUrns() {
        return this.urnToTransformTranslator.keySet();
    }

    public SparkStreamingPortablePipelineTranslator() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put("beam:transform:impulse:v1", SparkStreamingPortablePipelineTranslator::translateImpulse);
        builder.put("beam:transform:group_by_key:v1", SparkStreamingPortablePipelineTranslator::translateGroupByKey);
        builder.put("beam:runner:executable_stage:v1", SparkStreamingPortablePipelineTranslator::translateExecutableStage);
        builder.put("beam:transform:flatten:v1", SparkStreamingPortablePipelineTranslator::translateFlatten);
        builder.put("beam:transform:reshuffle:v1", SparkStreamingPortablePipelineTranslator::translateReshuffle);
        this.urnToTransformTranslator = builder.build();
    }

    @Override // org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator
    public void translate(RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        for (PipelineNode.PTransformNode pTransformNode : QueryablePipeline.forTransforms(pipeline.getRootTransformIdsList(), pipeline.getComponents()).getTopologicallyOrderedTransforms()) {
            ((PTransformTranslator) this.urnToTransformTranslator.getOrDefault(pTransformNode.getTransform().getSpec().getUrn(), SparkStreamingPortablePipelineTranslator::urnNotFound)).translate(pTransformNode, pipeline, sparkStreamingTranslationContext);
        }
    }

    private static void urnNotFound(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        throw new IllegalArgumentException(String.format("Transform %s has unknown URN %s", pTransformNode.getId(), pTransformNode.getTransform().getSpec().getUrn()));
    }

    private static void translateImpulse(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        List singletonList = Collections.singletonList(WindowedValue.of(new byte[0], BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE);
        JavaRDD map = sparkStreamingTranslationContext.getSparkContext().parallelize(CoderHelpers.toByteArrays(singletonList, of)).map(CoderHelpers.fromByteFunction(of));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        linkedBlockingQueue.offer(map);
        JavaInputDStream queueStream = sparkStreamingTranslationContext.getStreamingContext().queueStream(linkedBlockingQueue, true);
        UnboundedDataset unboundedDataset = new UnboundedDataset(queueStream, Collections.singletonList(Integer.valueOf(queueStream.inputDStream().id())));
        GlobalWatermarkHolder.add(unboundedDataset.getStreamSources().get(0).intValue(), new GlobalWatermarkHolder.SparkWatermarks(GlobalWindow.INSTANCE.maxTimestamp(), BoundedWindow.TIMESTAMP_MAX_VALUE, sparkStreamingTranslationContext.getFirstTimestamp()));
        sparkStreamingTranslationContext.pushDataset(PipelineTranslatorUtils.getOutputId(pTransformNode), unboundedDataset);
    }

    private static <K, V> void translateGroupByKey(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        RunnerApi.Components components = pipeline.getComponents();
        String inputId = PipelineTranslatorUtils.getInputId(pTransformNode);
        UnboundedDataset unboundedDataset = (UnboundedDataset) sparkStreamingTranslationContext.popDataset(inputId);
        List<Integer> streamSources = unboundedDataset.getStreamSources();
        KvCoder valueCoder = PipelineTranslatorUtils.getWindowedValueCoder(inputId, components).getValueCoder();
        WindowingStrategy windowingStrategy = PipelineTranslatorUtils.getWindowingStrategy(inputId, components);
        sparkStreamingTranslationContext.pushDataset(PipelineTranslatorUtils.getOutputId(pTransformNode), new UnboundedDataset(SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(unboundedDataset.getDStream(), valueCoder.getKeyCoder(), WindowedValue.FullWindowedValueCoder.of(valueCoder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder()), windowingStrategy, sparkStreamingTranslationContext.getSerializableOptions(), streamSources, pTransformNode.getId()), streamSources));
    }

    private static <InputT, OutputT, SideInputT> void translateExecutableStage(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        try {
            RunnerApi.ExecutableStagePayload parseFrom = RunnerApi.ExecutableStagePayload.parseFrom(pTransformNode.getTransform().getSpec().getPayload());
            String input = parseFrom.getInput();
            UnboundedDataset unboundedDataset = (UnboundedDataset) sparkStreamingTranslationContext.popDataset(input);
            List<Integer> streamSources = unboundedDataset.getStreamSources();
            JavaDStream dStream = unboundedDataset.getDStream();
            Map outputsMap = pTransformNode.getTransform().getOutputsMap();
            BiMap createOutputMap = PipelineTranslatorUtils.createOutputMap(outputsMap.values());
            Coder windowCoder = PipelineTranslatorUtils.getWindowingStrategy(input, pipeline.getComponents()).getWindowFn().windowCoder();
            if (parseFrom.getSideInputsCount() > 0) {
                throw new UnsupportedOperationException("Side inputs to executable stage are currently unsupported.");
            }
            final JavaDStream mapPartitions = dStream.mapPartitions(new SparkExecutableStageFunction(sparkStreamingTranslationContext.getSerializableOptions(), parseFrom, sparkStreamingTranslationContext.jobInfo, createOutputMap, SparkExecutableStageContextFactory.getInstance(), ImmutableMap.copyOf(new HashMap()), MetricsAccumulator.getInstance(), windowCoder));
            String executableStageIntermediateId = PipelineTranslatorUtils.getExecutableStageIntermediateId(pTransformNode);
            sparkStreamingTranslationContext.pushDataset(executableStageIntermediateId, new Dataset() { // from class: org.apache.beam.runners.spark.translation.SparkStreamingPortablePipelineTranslator.1
                @Override // org.apache.beam.runners.spark.translation.Dataset
                public void cache(String str, Coder<?> coder) {
                    mapPartitions.persist(StorageLevel.fromString(str));
                }

                @Override // org.apache.beam.runners.spark.translation.Dataset
                public void action() {
                    mapPartitions.foreachRDD(TranslationUtils.emptyVoidFunction());
                }

                @Override // org.apache.beam.runners.spark.translation.Dataset
                public void setName(String str) {
                }
            });
            sparkStreamingTranslationContext.popDataset(executableStageIntermediateId);
            for (String str : outputsMap.values()) {
                sparkStreamingTranslationContext.pushDataset(str, new UnboundedDataset(mapPartitions.flatMap(new SparkExecutableStageExtractionFunction(((Integer) createOutputMap.get(str)).intValue())), streamSources));
            }
            if (outputsMap.isEmpty()) {
                sparkStreamingTranslationContext.pushDataset(String.format("EmptyOutputSink_%d", Integer.valueOf(sparkStreamingTranslationContext.nextSinkId())), new UnboundedDataset(mapPartitions.flatMap(rawUnionValue -> {
                    return Collections.emptyIterator();
                }), streamSources));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static <T> void translateFlatten(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        JavaDStream union;
        Map inputsMap = pTransformNode.getTransform().getInputsMap();
        ArrayList arrayList = new ArrayList();
        if (inputsMap.isEmpty()) {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            linkedBlockingQueue.offer(sparkStreamingTranslationContext.getSparkContext().emptyRDD());
            union = sparkStreamingTranslationContext.getStreamingContext().queueStream(linkedBlockingQueue);
        } else {
            ArrayList arrayList2 = new ArrayList();
            Iterator it = inputsMap.values().iterator();
            while (it.hasNext()) {
                Dataset popDataset = sparkStreamingTranslationContext.popDataset((String) it.next());
                if (popDataset instanceof UnboundedDataset) {
                    UnboundedDataset unboundedDataset = (UnboundedDataset) popDataset;
                    arrayList.addAll(unboundedDataset.getStreamSources());
                    arrayList2.add(unboundedDataset.getDStream());
                } else {
                    LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
                    linkedBlockingQueue2.offer(((BoundedDataset) popDataset).getRDD());
                    arrayList2.add(sparkStreamingTranslationContext.getStreamingContext().queueStream(linkedBlockingQueue2));
                }
            }
            union = sparkStreamingTranslationContext.getStreamingContext().union(JavaConverters.asScalaBuffer(arrayList2));
        }
        sparkStreamingTranslationContext.pushDataset(PipelineTranslatorUtils.getOutputId(pTransformNode), new UnboundedDataset(union, arrayList));
    }

    private static <T> void translateReshuffle(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext sparkStreamingTranslationContext) {
        String inputId = PipelineTranslatorUtils.getInputId(pTransformNode);
        UnboundedDataset unboundedDataset = (UnboundedDataset) sparkStreamingTranslationContext.popDataset(inputId);
        List<Integer> streamSources = unboundedDataset.getStreamSources();
        JavaDStream<WindowedValue<T>> dStream = unboundedDataset.getDStream();
        WindowedValue.WindowedValueCoder windowedValueCoder = PipelineTranslatorUtils.getWindowedValueCoder(inputId, pipeline.getComponents());
        sparkStreamingTranslationContext.pushDataset(PipelineTranslatorUtils.getOutputId(pTransformNode), new UnboundedDataset(dStream.transform(javaRDD -> {
            return GroupCombineFunctions.reshuffle(javaRDD, windowedValueCoder);
        }), streamSources));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator
    public SparkStreamingTranslationContext createTranslationContext(JavaSparkContext javaSparkContext, SparkPipelineOptions sparkPipelineOptions, JobInfo jobInfo) {
        return new SparkStreamingTranslationContext(javaSparkContext, sparkPipelineOptions, jobInfo);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1092912986:
                if (implMethodName.equals("lambda$translateReshuffle$4a454bc1$1")) {
                    z = true;
                    break;
                }
                break;
            case 325913764:
                if (implMethodName.equals("lambda$translateExecutableStage$e59ce57c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/join/RawUnionValue;)Ljava/util/Iterator;")) {
                    return rawUnionValue -> {
                        return Collections.emptyIterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/util/WindowedValue$WindowedValueCoder;Lorg/apache/spark/api/java/JavaRDD;)Lorg/apache/spark/api/java/JavaRDD;")) {
                    WindowedValue.WindowedValueCoder windowedValueCoder = (WindowedValue.WindowedValueCoder) serializedLambda.getCapturedArg(0);
                    return javaRDD -> {
                        return GroupCombineFunctions.reshuffle(javaRDD, windowedValueCoder);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
