package org.apache.beam.fn.harness;

import java.util.Collections;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/ExternalWorkerService.class */
public class ExternalWorkerService extends BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExternalWorkerService.class);
    private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";
    private final PipelineOptions options;
    private final ServerFactory serverFactory = ServerFactory.createDefault();

    public ExternalWorkerService(PipelineOptions pipelineOptions) {
        this.options = pipelineOptions;
    }

    @Override // org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.AsyncService
    public void startWorker(BeamFnApi.StartWorkerRequest startWorkerRequest, StreamObserver<BeamFnApi.StartWorkerResponse> streamObserver) {
        LOG.info("Starting worker {} pointing at {}.", startWorkerRequest.getWorkerId(), startWorkerRequest.getControlEndpoint().getUrl());
        LOG.debug("Worker request {}.", startWorkerRequest);
        Thread thread = new Thread(() -> {
            try {
                FnHarness.main(startWorkerRequest.getWorkerId(), this.options, Collections.emptySet(), startWorkerRequest.getLoggingEndpoint(), startWorkerRequest.getControlEndpoint(), null);
                LOG.info("Successfully started worker {}.", startWorkerRequest.getWorkerId());
            } catch (Exception e) {
                LOG.error(String.format("Failed to start worker %s.", startWorkerRequest.getWorkerId()), (Throwable) e);
            }
        });
        thread.setName("SDK-worker-" + startWorkerRequest.getWorkerId());
        thread.setDaemon(true);
        thread.start();
        streamObserver.onNext(BeamFnApi.StartWorkerResponse.newBuilder().build());
        streamObserver.onCompleted();
    }

    @Override // org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.AsyncService
    public void stopWorker(BeamFnApi.StopWorkerRequest stopWorkerRequest, StreamObserver<BeamFnApi.StopWorkerResponse> streamObserver) {
        streamObserver.onNext(BeamFnApi.StopWorkerResponse.newBuilder().build());
        streamObserver.onCompleted();
    }

    @Override // org.apache.beam.sdk.fn.server.FnService, java.lang.AutoCloseable
    public void close() {
    }

    public GrpcFnServer<ExternalWorkerService> start() throws Exception {
        String externalServiceAddress = Environments.getExternalServiceAddress((PortablePipelineOptions) this.options.as(PortablePipelineOptions.class));
        GrpcFnServer<ExternalWorkerService> allocatePortAndCreateFor = externalServiceAddress.isEmpty() ? GrpcFnServer.allocatePortAndCreateFor(this, this.serverFactory) : GrpcFnServer.create(this, Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(), this.serverFactory);
        LOG.debug("Listening for worker start requests at {}.", allocatePortAndCreateFor.getApiServiceDescriptor().getUrl());
        return allocatePortAndCreateFor;
    }

    public static void main(String[] strArr) {
        LOG.info("Starting external worker service");
        String str = (String) Preconditions.checkArgumentNotNull(System.getenv(PIPELINE_OPTIONS_ENV_VAR), "No pipeline options provided in environment variables PIPELINE_OPTIONS");
        LOG.info("Pipeline options {}", str);
        try {
            try {
                GrpcFnServer<ExternalWorkerService> start = new ExternalWorkerService(PipelineOptionsTranslation.fromJson(str)).start();
                Throwable th = null;
                try {
                    try {
                        LOG.info("External worker service started at address: {}", start.getApiServiceDescriptor().getUrl());
                        Sleeper.DEFAULT.sleep(Long.MAX_VALUE);
                        if (start != null) {
                            if (0 != 0) {
                                try {
                                    start.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                start.close();
                            }
                        }
                        LOG.info("External worker service stopped.");
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (start != null) {
                        if (th != null) {
                            try {
                                start.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            start.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                LOG.info("External worker service stopped.");
                throw th6;
            }
        } catch (Exception e) {
            LOG.error("Error running worker service", (Throwable) e);
            LOG.info("External worker service stopped.");
        }
    }
}
