package org.apache.beam.runners.spark.structuredstreaming.translation;

import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.class */
public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
    private int depth = 0;
    private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class);
    protected AbstractTranslationContext translationContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$TranslationMode.class */
    public enum TranslationMode {
        BATCH,
        STREAMING
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator$TranslationModeDetector.class */
    private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
        private TranslationMode translationMode;

        TranslationModeDetector(TranslationMode translationMode) {
            this.translationMode = translationMode;
        }

        TranslationModeDetector() {
            this(TranslationMode.BATCH);
        }

        TranslationMode getTranslationMode() {
            return this.translationMode;
        }

        public void visitValue(PValue pValue, TransformHierarchy.Node node) {
            if (this.translationMode.equals(TranslationMode.BATCH) && (pValue instanceof PCollection) && ((PCollection) pValue).isBounded() == PCollection.IsBounded.UNBOUNDED) {
                LOG.info("Found unbounded PCollection {}. Switching to streaming execution.", pValue.getName());
                this.translationMode = TranslationMode.STREAMING;
            }
        }
    }

    public static void replaceTransforms(Pipeline pipeline, StreamingOptions streamingOptions) {
        pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(streamingOptions.isStreaming()));
    }

    public static void detectTranslationMode(Pipeline pipeline, StreamingOptions streamingOptions) {
        TranslationModeDetector translationModeDetector = new TranslationModeDetector();
        pipeline.traverseTopologically(translationModeDetector);
        if (translationModeDetector.getTranslationMode().equals(TranslationMode.STREAMING)) {
            streamingOptions.setStreaming(true);
        }
    }

    private static String genSpaces(int i) {
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < i; i2++) {
            sb.append("|   ");
        }
        return sb.toString();
    }

    protected abstract TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node);

    private <T extends PTransform<?, ?>> void applyTransformTranslator(TransformHierarchy.Node node, TransformTranslator<?> transformTranslator) {
        this.translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
        transformTranslator.translateTransform(node.getTransform(), this.translationContext);
    }

    public void translate(Pipeline pipeline) {
        LOG.debug("starting translation of the pipeline using {}", getClass().getName());
        pipeline.traverseTopologically(this);
    }

    public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
        LOG.debug("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
        this.depth++;
        TransformTranslator<?> transformTranslator = getTransformTranslator(node);
        if (transformTranslator == null) {
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }
        applyTransformTranslator(node, transformTranslator);
        LOG.debug("{} translated- {}", genSpaces(this.depth), node.getFullName());
        return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
    }

    public void leaveCompositeTransform(TransformHierarchy.Node node) {
        this.depth--;
        LOG.debug("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        LOG.debug("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
        TransformTranslator<?> transformTranslator = getTransformTranslator(node);
        if (transformTranslator == null) {
            throw new UnsupportedOperationException("The transform " + PTransformTranslation.urnForTransform(node.getTransform()) + " is currently not supported.");
        }
        applyTransformTranslator(node, transformTranslator);
    }

    public AbstractTranslationContext getTranslationContext() {
        return this.translationContext;
    }
}
