package org.apache.beam.runners.jobsubmission;

import java.io.IOException;
import java.nio.file.Paths;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.sdk.expansion.service.ExpansionServer;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/jobsubmission/JobServerDriver.class */
public abstract class JobServerDriver implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(JobServerDriver.class);

    @VisibleForTesting
    public ServerConfiguration configuration;
    private final ServerFactory jobServerFactory;
    private final ServerFactory artifactServerFactory;
    private final JobInvokerFactory jobInvokerFactory;
    private volatile GrpcFnServer<InMemoryJobService> jobServer;
    private volatile GrpcFnServer<ArtifactStagingService> artifactStagingServer;
    private volatile ExpansionServer expansionServer;

    /* loaded from: input_file:org/apache/beam/runners/jobsubmission/JobServerDriver$JobInvokerFactory.class */
    public interface JobInvokerFactory {
        JobInvoker create();
    }

    /* loaded from: input_file:org/apache/beam/runners/jobsubmission/JobServerDriver$ServerConfiguration.class */
    public static class ServerConfiguration {

        @Option(name = "--job-host", usage = "The job server host name")
        private String host = "localhost";

        @Option(name = "--job-port", usage = "The job service port. 0 to use a dynamic port. (Default: 8099)")
        private int port = 8099;

        @Option(name = "--artifact-port", usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)")
        private int artifactPort = 8098;

        @Option(name = "--expansion-port", usage = "The Java expansion service port. 0 to use a dynamic port, negative number to disable expansion service creation (Default: 8097)")
        private int expansionPort = 8097;

        @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files. If artifact staging is needed, this directory must be accessible by the execution engine's workers.")
        private String artifactStagingPath = Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();

        @Option(name = "--clean-artifacts-per-job", usage = "When true, remove each job's staged artifacts when it completes", handler = ExplicitBooleanOptionHandler.class)
        private boolean cleanArtifactsPerJob = true;

        @Option(name = "--history-size", usage = "The maximum number of completed jobs to keep.")
        private int maxInvocationHistory = 10;

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public int getArtifactPort() {
            return this.artifactPort;
        }

        public int getExpansionPort() {
            return this.expansionPort;
        }

        public String getArtifactStagingPath() {
            return this.artifactStagingPath;
        }

        public boolean isCleanArtifactsPerJob() {
            return this.cleanArtifactsPerJob;
        }

        public int getMaxInvocationHistory() {
            return this.maxInvocationHistory;
        }
    }

    protected InMemoryJobService createJobService() throws IOException {
        this.artifactStagingServer = createArtifactStagingService();
        this.expansionServer = createExpansionService();
        return InMemoryJobService.create(this.artifactStagingServer, this::createSessionToken, str -> {
            if (this.configuration.cleanArtifactsPerJob) {
                this.artifactStagingServer.getService().removeStagedArtifacts(str);
            }
        }, this.jobInvokerFactory.create(), this.configuration.getMaxInvocationHistory());
    }

    protected static ServerFactory createJobServerFactory(ServerConfiguration serverConfiguration) {
        return ServerFactory.createWithPortSupplier(() -> {
            return Integer.valueOf(serverConfiguration.port);
        });
    }

    protected static ServerFactory createArtifactServerFactory(ServerConfiguration serverConfiguration) {
        return ServerFactory.createWithPortSupplier(() -> {
            return Integer.valueOf(serverConfiguration.artifactPort);
        });
    }

    protected JobServerDriver(ServerConfiguration serverConfiguration, ServerFactory serverFactory, ServerFactory serverFactory2, JobInvokerFactory jobInvokerFactory) {
        this.configuration = serverConfiguration;
        this.jobServerFactory = serverFactory;
        this.artifactServerFactory = serverFactory2;
        this.jobInvokerFactory = jobInvokerFactory;
    }

    public String getJobServerUrl() {
        if (this.jobServer != null) {
            return this.jobServer.getApiServiceDescriptor().getUrl();
        }
        return null;
    }

    public String start() throws IOException {
        this.jobServer = createJobServer();
        return this.jobServer.getApiServiceDescriptor().getUrl();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.jobServer = createJobServer();
            LOG.info("Job server now running, terminate with Ctrl+C");
            this.jobServer.getServer().awaitTermination();
        } catch (InterruptedException e) {
            LOG.warn("Job server interrupted", e);
        } catch (Exception e2) {
            LOG.warn("Exception during job server creation", e2);
        } finally {
            stop();
        }
    }

    public synchronized void stop() {
        if (this.jobServer != null) {
            try {
                this.jobServer.close();
                LOG.info("JobServer stopped on {}", this.jobServer.getApiServiceDescriptor().getUrl());
                this.jobServer = null;
            } catch (Exception e) {
                LOG.error("Error while closing the jobServer.", e);
            }
        }
        if (this.artifactStagingServer != null) {
            try {
                this.artifactStagingServer.close();
                LOG.info("ArtifactStagingServer stopped on {}", this.artifactStagingServer.getApiServiceDescriptor().getUrl());
                this.artifactStagingServer = null;
            } catch (Exception e2) {
                LOG.error("Error while closing the artifactStagingServer.", e2);
            }
        }
        if (this.expansionServer != null) {
            try {
                this.expansionServer.close();
                LOG.info("Expansion stopped on {}:{}", this.expansionServer.getHost(), Integer.valueOf(this.expansionServer.getPort()));
                this.expansionServer = null;
            } catch (Exception e3) {
                LOG.error("Error while closing the Expansion Service.", e3);
            }
        }
    }

    protected String createSessionToken(String str) {
        return str;
    }

    private GrpcFnServer<InMemoryJobService> createJobServer() throws IOException {
        InMemoryJobService createJobService = createJobService();
        GrpcFnServer<InMemoryJobService> allocatePortAndCreateFor = this.configuration.port == 0 ? GrpcFnServer.allocatePortAndCreateFor(createJobService, this.jobServerFactory) : GrpcFnServer.create(createJobService, Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.port).build(), this.jobServerFactory);
        LOG.info("JobService started on {}", allocatePortAndCreateFor.getApiServiceDescriptor().getUrl());
        return allocatePortAndCreateFor;
    }

    private GrpcFnServer<ArtifactStagingService> createArtifactStagingService() throws IOException {
        ArtifactStagingService artifactStagingService = new ArtifactStagingService(ArtifactStagingService.beamFilesystemArtifactDestinationProvider(this.configuration.artifactStagingPath));
        GrpcFnServer<ArtifactStagingService> allocatePortAndCreateFor = this.configuration.artifactPort == 0 ? GrpcFnServer.allocatePortAndCreateFor(artifactStagingService, this.artifactServerFactory) : GrpcFnServer.create(artifactStagingService, Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host + ":" + this.configuration.artifactPort).build(), this.artifactServerFactory);
        LOG.info("ArtifactStagingService started on {}", allocatePortAndCreateFor.getApiServiceDescriptor().getUrl());
        return allocatePortAndCreateFor;
    }

    @Nullable
    private ExpansionServer createExpansionService() throws IOException {
        if (this.configuration.expansionPort < 0) {
            return null;
        }
        ExpansionServer create = ExpansionServer.create(new ExpansionService(), this.configuration.host, this.configuration.expansionPort);
        LOG.info("Java ExpansionService started on {}:{}", create.getHost(), Integer.valueOf(create.getPort()));
        return create;
    }
}
