package org.apache.beam.runners.flink;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.flink.FlinkPortableRunnerResult;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkPipelineRunner.class */
public class FlinkPipelineRunner implements PortablePipelineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
    private final FlinkPipelineOptions pipelineOptions;
    private final String confDir;
    private final List<String> filesToStage;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkPipelineRunner$FlinkPipelineRunnerConfiguration.class */
    public static class FlinkPipelineRunnerConfiguration {

        @Option(name = "--flink-conf-dir", usage = "Directory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.")
        private String flinkConfDir;

        @Option(name = "--base-job-name", usage = "The job to run. This must correspond to a subdirectory of the jar's BEAM-PIPELINE directory. *Only needs to be specified if the jar contains multiple pipelines.*")
        private String baseJobName;

        private FlinkPipelineRunnerConfiguration() {
            this.flinkConfDir = null;
            this.baseJobName = null;
        }
    }

    public FlinkPipelineRunner(FlinkPipelineOptions flinkPipelineOptions, String str, List<String> list) {
        this.pipelineOptions = flinkPipelineOptions;
        this.confDir = flinkPipelineOptions.getFlinkConfDir() != null ? flinkPipelineOptions.getFlinkConfDir() : str;
        this.filesToStage = list;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator] */
    public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
        MetricsEnvironment.setMetricsSupported(false);
        SdkHarnessOptions.getConfiguredLoggerFromOptions(this.pipelineOptions.as(SdkHarnessOptions.class));
        return runPipelineWithTranslator(pipeline, jobInfo, (this.pipelineOptions.isStreaming() || PipelineTranslatorUtils.hasUnboundedPCollections(pipeline)) ? new FlinkStreamingPortablePipelineTranslator() : FlinkBatchPortablePipelineTranslator.createTranslator());
    }

    private <T extends FlinkPortablePipelineTranslator.TranslationContext> PortablePipelineResult runPipelineWithTranslator(RunnerApi.Pipeline pipeline, JobInfo jobInfo, FlinkPortablePipelineTranslator<T> flinkPortablePipelineTranslator) throws Exception {
        LOG.info("Translating pipeline to Flink program.");
        return createPortablePipelineResult(flinkPortablePipelineTranslator.translate(flinkPortablePipelineTranslator.createTranslationContext(jobInfo, this.pipelineOptions, this.confDir, this.filesToStage), flinkPortablePipelineTranslator.prepareForTranslation(pipeline)).execute(this.pipelineOptions.getJobName()), this.pipelineOptions);
    }

    private PortablePipelineResult createPortablePipelineResult(JobExecutionResult jobExecutionResult, PipelineOptions pipelineOptions) {
        if (jobExecutionResult.getClass().getCanonicalName().equals("org.apache.flink.core.execution.DetachedJobExecutionResult")) {
            LOG.info("Pipeline submitted in Detached mode");
            return new FlinkPortableRunnerResult.Detached();
        }
        LOG.info("Execution finished in {} msecs", Long.valueOf(jobExecutionResult.getNetRuntime()));
        Map allAccumulatorResults = jobExecutionResult.getAllAccumulatorResults();
        if (allAccumulatorResults != null && !allAccumulatorResults.isEmpty()) {
            LOG.info("Final accumulator values:");
            for (Map.Entry entry : jobExecutionResult.getAllAccumulatorResults().entrySet()) {
                LOG.info("{} : {}", entry.getKey(), entry.getValue());
            }
        }
        FlinkPortableRunnerResult flinkPortableRunnerResult = new FlinkPortableRunnerResult(allAccumulatorResults, jobExecutionResult.getNetRuntime());
        new MetricsPusher(flinkPortableRunnerResult.getMetricsContainerStepMap(), pipelineOptions.as(MetricsOptions.class), flinkPortableRunnerResult).start();
        return flinkPortableRunnerResult;
    }

    public static void main(String[] strArr) throws Exception {
        FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
        FlinkPipelineRunnerConfiguration parseArgs = parseArgs(strArr);
        String defaultJobName = parseArgs.baseJobName == null ? PortablePipelineJarUtils.getDefaultJobName() : parseArgs.baseJobName;
        Preconditions.checkArgument(defaultJobName != null, "No default job name found. Job name must be set using --base-job-name.");
        RunnerApi.Pipeline pipelineFromClasspath = PortablePipelineJarUtils.getPipelineFromClasspath(defaultJobName);
        Struct pipelineOptionsFromClasspath = PortablePipelineJarUtils.getPipelineOptionsFromClasspath(defaultJobName);
        String str = (String) ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant);
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) PipelineOptionsTranslation.fromProto(pipelineOptionsFromClasspath).as(FlinkPipelineOptions.class);
        String format = String.format("%s_%s", flinkPipelineOptions.getJobName(), UUID.randomUUID().toString());
        try {
            new FlinkPipelineRunner(flinkPipelineOptions, parseArgs.flinkConfDir, PipelineResources.detectClassPathResourcesToStage(FlinkPipelineRunner.class.getClassLoader(), flinkPipelineOptions)).run(pipelineFromClasspath, JobInfo.create(format, flinkPipelineOptions.getJobName(), str, PipelineOptionsTranslation.toProto(flinkPipelineOptions)));
            LOG.info("Job {} finished successfully.", format);
        } catch (Exception e) {
            throw new RuntimeException(String.format("Job %s failed.", format), e);
        }
    }

    private static FlinkPipelineRunnerConfiguration parseArgs(String[] strArr) {
        FlinkPipelineRunnerConfiguration flinkPipelineRunnerConfiguration = new FlinkPipelineRunnerConfiguration();
        CmdLineParser cmdLineParser = new CmdLineParser(flinkPipelineRunnerConfiguration);
        try {
            cmdLineParser.parseArgument(strArr);
            return flinkPipelineRunnerConfiguration;
        } catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", e);
            cmdLineParser.printUsage(System.err);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
    }
}
