package org.apache.beam.runners.flink;

import java.util.Map;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class */
class FlinkPipelineExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
    private final FlinkPipelineOptions options;
    private ExecutionEnvironment flinkBatchEnv;
    private StreamExecutionEnvironment flinkStreamEnv;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions) {
        this.options = (FlinkPipelineOptions) Preconditions.checkNotNull(flinkPipelineOptions);
    }

    public void translate(Pipeline pipeline) {
        FlinkPipelineTranslator flinkStreamingPipelineTranslator;
        this.flinkBatchEnv = null;
        this.flinkStreamEnv = null;
        boolean hasUnboundedOutput = PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline);
        if (hasUnboundedOutput) {
            LOG.info("Found unbounded PCollection. Switching to streaming execution.");
            this.options.setStreaming(true);
        }
        prepareFilesToStageForRemoteClusterExecution(this.options);
        if (this.options.isStreaming() || this.options.getUseDataStreamForBatch().booleanValue()) {
            this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(this.options);
            if (hasUnboundedOutput && !this.flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
                LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
            }
            flinkStreamingPipelineTranslator = new FlinkStreamingPipelineTranslator(this.flinkStreamEnv, this.options, this.options.isStreaming());
            if (!this.options.isStreaming()) {
                this.flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
            }
        } else {
            this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(this.options);
            flinkStreamingPipelineTranslator = new FlinkBatchPipelineTranslator(this.flinkBatchEnv, this.options);
        }
        pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(this.options));
        flinkStreamingPipelineTranslator.translate(pipeline);
    }

    private static void prepareFilesToStageForRemoteClusterExecution(FlinkPipelineOptions flinkPipelineOptions) {
        if (flinkPipelineOptions.getFlinkMaster().matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) {
            return;
        }
        PipelineResources.prepareFilesForStaging(flinkPipelineOptions);
    }

    public PipelineResult executePipeline() throws Exception {
        String jobName = this.options.getJobName();
        if (this.flinkBatchEnv != null) {
            return this.options.getAttachedMode() ? createAttachedPipelineResult(this.flinkBatchEnv.execute(jobName)) : createDetachedPipelineResult(this.flinkBatchEnv.executeAsync(jobName), this.options);
        }
        if (this.flinkStreamEnv != null) {
            return this.options.getAttachedMode() ? createAttachedPipelineResult(this.flinkStreamEnv.execute(jobName)) : createDetachedPipelineResult(this.flinkStreamEnv.executeAsync(jobName), this.options);
        }
        throw new IllegalStateException("The Pipeline has not yet been translated.");
    }

    private FlinkDetachedRunnerResult createDetachedPipelineResult(JobClient jobClient, FlinkPipelineOptions flinkPipelineOptions) {
        LOG.info("Pipeline submitted in detached mode");
        return new FlinkDetachedRunnerResult(jobClient, flinkPipelineOptions.getJobCheckIntervalInSecs());
    }

    private FlinkRunnerResult createAttachedPipelineResult(JobExecutionResult jobExecutionResult) {
        LOG.info("Execution finished in {} msecs", Long.valueOf(jobExecutionResult.getNetRuntime()));
        Map allAccumulatorResults = jobExecutionResult.getAllAccumulatorResults();
        if (allAccumulatorResults != null && !allAccumulatorResults.isEmpty()) {
            LOG.info("Final accumulator values:");
            for (Map.Entry entry : jobExecutionResult.getAllAccumulatorResults().entrySet()) {
                LOG.info("{} : {}", entry.getKey(), entry.getValue());
            }
        }
        FlinkRunnerResult flinkRunnerResult = new FlinkRunnerResult(allAccumulatorResults, jobExecutionResult.getNetRuntime());
        new MetricsPusher(flinkRunnerResult.getMetricsContainerStepMap(), this.options.as(MetricsOptions.class), flinkRunnerResult).start();
        return flinkRunnerResult;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public JobGraph getJobGraph(Pipeline pipeline) {
        translate(pipeline);
        StreamGraph streamGraph = this.flinkStreamEnv.getStreamGraph();
        streamGraph.setJobName(pipeline.getOptions().getJobName());
        return streamGraph.getJobGraph();
    }

    @VisibleForTesting
    ExecutionEnvironment getBatchExecutionEnvironment() {
        return this.flinkBatchEnv;
    }

    @VisibleForTesting
    StreamExecutionEnvironment getStreamExecutionEnvironment() {
        return this.flinkStreamEnv;
    }
}
