package org.apache.beam.runners.flink;

import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.class */
public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
    private final FlinkStreamingTranslationContext streamingContext;
    private int depth = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator$StreamTransformTranslator.class */
    public static abstract class StreamTransformTranslator<T extends PTransform> {
        abstract void translateNode(T t, FlinkStreamingTranslationContext flinkStreamingTranslationContext);

        boolean canTranslate(T t, FlinkStreamingTranslationContext flinkStreamingTranslationContext) {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator$StreamingShardedWriteFactory.class */
    public static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT> implements PTransformOverrideFactory<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> {
        FlinkPipelineOptions options;

        /* JADX INFO: Access modifiers changed from: package-private */
        public StreamingShardedWriteFactory(PipelineOptions pipelineOptions) {
            this.options = (FlinkPipelineOptions) pipelineOptions.as(FlinkPipelineOptions.class);
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>> getReplacementTransform(AppliedPTransform<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> appliedPTransform) {
            Integer parallelism = this.options.getParallelism();
            Preconditions.checkArgument(parallelism.intValue() > 0, "Parallelism of a job should be greater than 0. Currently set: %s", new Object[]{parallelism});
            int intValue = parallelism.intValue() * 2;
            try {
                WriteFiles withSideInputs = WriteFiles.to(WriteFilesTranslation.getSink(appliedPTransform)).withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(appliedPTransform));
                if (WriteFilesTranslation.isWindowedWrites(appliedPTransform)) {
                    withSideInputs = withSideInputs.withWindowedWrites();
                }
                return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), withSideInputs.withNumShards(intValue));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, WriteFilesResult<DestinationT> writeFilesResult) {
            return Collections.emptyMap();
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PValue>) map, (WriteFilesResult) pOutput);
        }
    }

    public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment streamExecutionEnvironment, PipelineOptions pipelineOptions) {
        this.streamingContext = new FlinkStreamingTranslationContext(streamExecutionEnvironment, pipelineOptions);
    }

    @Override // org.apache.beam.runners.flink.FlinkPipelineTranslator
    public void translate(Pipeline pipeline) {
        UnconsumedReads.ensureAllReadsConsumed(pipeline);
        super.translate(pipeline);
    }

    public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
        StreamTransformTranslator<?> translator;
        LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
        this.depth++;
        PTransform<?, ?> transform = node.getTransform();
        if (transform == null || (translator = FlinkStreamingTransformTranslators.getTranslator(transform)) == null || !applyCanTranslate(transform, node, translator)) {
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }
        applyStreamingTransform(transform, node, translator);
        LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
        return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
    }

    public void leaveCompositeTransform(TransformHierarchy.Node node) {
        this.depth--;
        LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
        PTransform<?, ?> transform = node.getTransform();
        StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
        if (translator != null && applyCanTranslate(transform, node, translator)) {
            applyStreamingTransform(transform, node, translator);
        } else {
            String urnForTransform = PTransformTranslation.urnForTransform(transform);
            LOG.info(urnForTransform);
            throw new UnsupportedOperationException("The transform " + urnForTransform + " is currently not supported.");
        }
    }

    public void visitValue(PValue pValue, TransformHierarchy.Node node) {
    }

    private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> pTransform, TransformHierarchy.Node node, StreamTransformTranslator<?> streamTransformTranslator) {
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
        streamTransformTranslator.translateNode(pTransform, this.streamingContext);
    }

    private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> pTransform, TransformHierarchy.Node node, StreamTransformTranslator<?> streamTransformTranslator) {
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
        return streamTransformTranslator.canTranslate(pTransform, this.streamingContext);
    }
}
