package org.apache.beam.runners.flink;

import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class */
class FlinkPipelineExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FlinkPipelineExecutionEnvironment.class);
    private final FlinkPipelineOptions options;
    private ExecutionEnvironment flinkBatchEnv;
    private StreamExecutionEnvironment flinkStreamEnv;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions) {
        this.options = (FlinkPipelineOptions) Preconditions.checkNotNull(flinkPipelineOptions);
    }

    public void translate(Pipeline pipeline) {
        FlinkPipelineTranslator flinkBatchPipelineTranslator;
        this.flinkBatchEnv = null;
        this.flinkStreamEnv = null;
        boolean hasUnboundedOutput = PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline);
        if (hasUnboundedOutput) {
            LOG.info("Found unbounded PCollection. Switching to streaming execution.");
            this.options.setStreaming(true);
        }
        prepareFilesToStageForRemoteClusterExecution(this.options);
        if (this.options.isStreaming()) {
            this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(this.options, this.options.getFilesToStage());
            if (hasUnboundedOutput && !this.flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
                LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
            }
            flinkBatchPipelineTranslator = new FlinkStreamingPipelineTranslator(this.flinkStreamEnv, this.options);
        } else {
            this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(this.options, this.options.getFilesToStage());
            flinkBatchPipelineTranslator = new FlinkBatchPipelineTranslator(this.flinkBatchEnv, this.options);
        }
        pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(this.options));
        flinkBatchPipelineTranslator.translate(pipeline);
    }

    private static void prepareFilesToStageForRemoteClusterExecution(FlinkPipelineOptions flinkPipelineOptions) {
        if (flinkPipelineOptions.getFlinkMaster().matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) {
            return;
        }
        flinkPipelineOptions.setFilesToStage(PipelineResources.prepareFilesForStaging(flinkPipelineOptions.getFilesToStage(), (String) MoreObjects.firstNonNull(flinkPipelineOptions.getTempLocation(), System.getProperty("java.io.tmpdir"))));
    }

    public JobExecutionResult executePipeline() throws Exception {
        String jobName = this.options.getJobName();
        if (this.flinkBatchEnv != null) {
            return this.flinkBatchEnv.execute(jobName);
        }
        if (this.flinkStreamEnv != null) {
            return this.flinkStreamEnv.execute(jobName);
        }
        throw new IllegalStateException("The Pipeline has not yet been translated.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public JobGraph getJobGraph(Pipeline pipeline) {
        translate(pipeline);
        return this.flinkStreamEnv.getStreamGraph().getJobGraph();
    }

    @VisibleForTesting
    ExecutionEnvironment getBatchExecutionEnvironment() {
        return this.flinkBatchEnv;
    }

    @VisibleForTesting
    StreamExecutionEnvironment getStreamExecutionEnvironment() {
        return this.flinkStreamEnv;
    }
}
