package org.apache.beam.runners.flink;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobServerDriver.class */
public class FlinkJobServerDriver implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);
    private final ListeningExecutorService executor;

    @VisibleForTesting
    ServerConfiguration configuration;
    private final ServerFactory jobServerFactory;
    private final ServerFactory artifactServerFactory;
    private volatile GrpcFnServer<InMemoryJobService> jobServer;
    private volatile GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingServer;

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobServerDriver$ServerConfiguration.class */
    public static class ServerConfiguration {

        @Option(name = "--job-host", usage = "The job server host name")
        String host = "localhost";

        @Option(name = "--job-port", usage = "The job service port. 0 to use a dynamic port. (Default: 8099)")
        int port = 8099;

        @Option(name = "--artifact-port", usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)")
        int artifactPort = 8098;

        @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
        String artifactStagingPath = Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();

        @Option(name = "--clean-artifacts-per-job", usage = "When true, remove each job's staged artifacts when it completes")
        boolean cleanArtifactsPerJob = false;

        @Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
        String flinkMasterUrl = FlinkPipelineOptions.AUTO;

        @Option(name = "--sdk-worker-parallelism", usage = "Default parallelism for SDK worker processes (see portable pipeline options)")
        Long sdkWorkerParallelism = 1L;

        @Option(name = "--flink-conf-dir", usage = "Directory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.")
        String flinkConfDir = null;

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getFlinkMasterUrl() {
            return this.flinkMasterUrl;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Long getSdkWorkerParallelism() {
            return this.sdkWorkerParallelism;
        }

        @Nullable
        String getFlinkConfDir() {
            return this.flinkConfDir;
        }
    }

    public static void main(String[] strArr) throws Exception {
        FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
        fromParams(strArr).run();
    }

    private static void printUsage(CmdLineParser cmdLineParser) {
        System.err.println(String.format("Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName()));
        cmdLineParser.printUsage(System.err);
        System.err.println();
    }

    public static FlinkJobServerDriver fromParams(String[] strArr) {
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        CmdLineParser cmdLineParser = new CmdLineParser(serverConfiguration);
        try {
            cmdLineParser.parseArgument(strArr);
            return fromConfig(serverConfiguration);
        } catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", e);
            printUsage(cmdLineParser);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
    }

    public static FlinkJobServerDriver fromConfig(ServerConfiguration serverConfiguration) {
        return create(serverConfiguration, MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build())), ServerFactory.createWithPortSupplier(() -> {
            return Integer.valueOf(serverConfiguration.port);
        }), ServerFactory.createWithPortSupplier(() -> {
            return Integer.valueOf(serverConfiguration.artifactPort);
        }));
    }

    public static FlinkJobServerDriver create(ServerConfiguration serverConfiguration, ListeningExecutorService listeningExecutorService, ServerFactory serverFactory, ServerFactory serverFactory2) {
        return new FlinkJobServerDriver(serverConfiguration, listeningExecutorService, serverFactory, serverFactory2);
    }

    private FlinkJobServerDriver(ServerConfiguration serverConfiguration, ListeningExecutorService listeningExecutorService, ServerFactory serverFactory, ServerFactory serverFactory2) {
        this.configuration = serverConfiguration;
        this.executor = listeningExecutorService;
        this.jobServerFactory = serverFactory;
        this.artifactServerFactory = serverFactory2;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.jobServer = createJobServer();
            this.jobServer.getServer().awaitTermination();
        } catch (InterruptedException e) {
            LOG.warn("Job server interrupted", e);
        } catch (Exception e2) {
            LOG.warn("Exception during job server creation", e2);
        } finally {
            stop();
        }
    }

    public String start() throws IOException {
        this.jobServer = createJobServer();
        return this.jobServer.getApiServiceDescriptor().getUrl();
    }

    public synchronized void stop() {
        if (this.jobServer != null) {
            try {
                this.jobServer.close();
                LOG.info("JobServer stopped on {}", this.jobServer.getApiServiceDescriptor().getUrl());
                this.jobServer = null;
            } catch (Exception e) {
                LOG.error("Error while closing the jobServer.", e);
            }
        }
        if (this.artifactStagingServer != null) {
            try {
                this.artifactStagingServer.close();
                LOG.info("ArtifactStagingServer stopped on {}", this.artifactStagingServer.getApiServiceDescriptor().getUrl());
                this.artifactStagingServer = null;
            } catch (Exception e2) {
                LOG.error("Error while closing the artifactStagingServer.", e2);
            }
        }
    }

    private GrpcFnServer<InMemoryJobService> createJobServer() throws IOException {
        InMemoryJobService createJobService = createJobService();
        GrpcFnServer<InMemoryJobService> allocatePortAndCreateFor = this.configuration.port == 0 ? GrpcFnServer.allocatePortAndCreateFor(createJobService, this.jobServerFactory) : GrpcFnServer.create(createJobService, Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.port).build(), this.jobServerFactory);
        LOG.info("JobService started on {}", allocatePortAndCreateFor.getApiServiceDescriptor().getUrl());
        return allocatePortAndCreateFor;
    }

    private InMemoryJobService createJobService() throws IOException {
        this.artifactStagingServer = createArtifactStagingService();
        return InMemoryJobService.create(this.artifactStagingServer.getApiServiceDescriptor(), str -> {
            try {
                return BeamFileSystemArtifactStagingService.generateStagingSessionToken(str, this.configuration.artifactStagingPath);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, str2 -> {
            if (this.configuration.cleanArtifactsPerJob) {
                this.artifactStagingServer.getService().removeArtifacts(str2);
            }
        }, createJobInvoker());
    }

    private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService() throws IOException {
        BeamFileSystemArtifactStagingService beamFileSystemArtifactStagingService = new BeamFileSystemArtifactStagingService();
        GrpcFnServer<BeamFileSystemArtifactStagingService> allocatePortAndCreateFor = this.configuration.artifactPort == 0 ? GrpcFnServer.allocatePortAndCreateFor(beamFileSystemArtifactStagingService, this.artifactServerFactory) : GrpcFnServer.create(beamFileSystemArtifactStagingService, Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.artifactPort).build(), this.artifactServerFactory);
        LOG.info("ArtifactStagingService started on {}", allocatePortAndCreateFor.getApiServiceDescriptor().getUrl());
        return allocatePortAndCreateFor;
    }

    private JobInvoker createJobInvoker() {
        return FlinkJobInvoker.create(this.executor, this.configuration);
    }
}
