package org.apache.beam.runners.reference;

import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/reference/ExternalWorkerService.class */
public class ExternalWorkerService extends BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase.class);
    private final PipelineOptions options;
    private final ServerFactory serverFactory = ServerFactory.createDefault();

    public ExternalWorkerService(PipelineOptions pipelineOptions) {
        this.options = pipelineOptions;
    }

    public void notifyRunnerAvailable(BeamFnApi.NotifyRunnerAvailableRequest notifyRunnerAvailableRequest, StreamObserver<BeamFnApi.NotifyRunnerAvailableResponse> streamObserver) {
        LOG.info("Starting worker {} pointing at {}.", notifyRunnerAvailableRequest.getWorkerId(), notifyRunnerAvailableRequest.getControlEndpoint().getUrl());
        LOG.debug("Worker request {}.", notifyRunnerAvailableRequest);
        Thread thread = new Thread(() -> {
            try {
                FnHarness.main(notifyRunnerAvailableRequest.getWorkerId(), this.options, notifyRunnerAvailableRequest.getLoggingEndpoint(), notifyRunnerAvailableRequest.getControlEndpoint());
                LOG.info("Successfully started worker {}.", notifyRunnerAvailableRequest.getWorkerId());
            } catch (Exception e) {
                LOG.error(String.format("Failed to start worker %s.", notifyRunnerAvailableRequest.getWorkerId()), e);
            }
        });
        thread.setName("SDK-worker-" + notifyRunnerAvailableRequest.getWorkerId());
        thread.setDaemon(true);
        thread.start();
        streamObserver.onNext(BeamFnApi.NotifyRunnerAvailableResponse.newBuilder().build());
        streamObserver.onCompleted();
    }

    public void close() {
    }

    public GrpcFnServer<ExternalWorkerService> start() throws Exception {
        GrpcFnServer<ExternalWorkerService> allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(this, this.serverFactory);
        LOG.debug("Listening for worker start requests at {}.", allocatePortAndCreateFor.getApiServiceDescriptor().getUrl());
        return allocatePortAndCreateFor;
    }
}
