package org.apache.beam.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.HeaderAccessor;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.class */
public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FnApiControlClientPoolService.class);
    private final ControlClientPool.Sink clientSink;
    private final HeaderAccessor headerAccessor;
    private final Object lock = new Object();
    private final ConcurrentMap<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors = new ConcurrentHashMap();

    @GuardedBy("lock")
    private final Map<String, FnApiControlClient> vendedClients = new HashMap();

    @GuardedBy("lock")
    private boolean closed = false;

    private FnApiControlClientPoolService(ControlClientPool.Sink sink, HeaderAccessor headerAccessor) {
        this.clientSink = sink;
        this.headerAccessor = headerAccessor;
    }

    public static FnApiControlClientPoolService offeringClientsToPool(ControlClientPool.Sink sink, HeaderAccessor headerAccessor) {
        return new FnApiControlClientPoolService(sink, headerAccessor);
    }

    @Override // org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc.AsyncService
    public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
        String sdkWorkerId = this.headerAccessor.getSdkWorkerId();
        LOG.info("Beam Fn Control client connected with id {}", sdkWorkerId);
        FnApiControlClient forRequestObserver = FnApiControlClient.forRequestObserver(sdkWorkerId, streamObserver, this.processBundleDescriptors);
        try {
            synchronized (this.lock) {
                Preconditions.checkState(!this.closed, "%s already closed", FnApiControlClientPoolService.class.getSimpleName());
                this.vendedClients.put(sdkWorkerId, forRequestObserver);
            }
            this.clientSink.put(this.headerAccessor.getSdkWorkerId(), forRequestObserver);
            return forRequestObserver.asResponseObserver();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc.AsyncService
    public void getProcessBundleDescriptor(BeamFnApi.GetProcessBundleDescriptorRequest getProcessBundleDescriptorRequest, StreamObserver<BeamFnApi.ProcessBundleDescriptor> streamObserver) {
        String processBundleDescriptorId = getProcessBundleDescriptorRequest.getProcessBundleDescriptorId();
        LOG.info("getProcessBundleDescriptor request with id {}", processBundleDescriptorId);
        BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = this.processBundleDescriptors.get(processBundleDescriptorId);
        if (processBundleDescriptor != null) {
            streamObserver.onNext(processBundleDescriptor);
            streamObserver.onCompleted();
        } else {
            String format = String.format("ProcessBundleDescriptor with id %s not found", processBundleDescriptorId);
            streamObserver.onError(new StatusException(Status.NOT_FOUND.withDescription(format)));
            LOG.error(format);
        }
    }

    @Override // org.apache.beam.sdk.fn.server.FnService, java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            if (!this.closed) {
                this.closed = true;
                Iterator<FnApiControlClient> it = this.vendedClients.values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
            }
        }
    }
}
