package org.apache.beam.repackaged.direct_java.runners.fnexecution.control;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.HeaderAccessor;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/control/FnApiControlClientPoolService.class */
public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
    private final ControlClientPool.Sink clientSink;
    private final HeaderAccessor headerAccessor;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Collection<FnApiControlClient> vendedClients = new ArrayList();

    @GuardedBy("lock")
    private boolean closed = false;

    private FnApiControlClientPoolService(ControlClientPool.Sink sink, HeaderAccessor headerAccessor) {
        this.clientSink = sink;
        this.headerAccessor = headerAccessor;
    }

    public static FnApiControlClientPoolService offeringClientsToPool(ControlClientPool.Sink sink, HeaderAccessor headerAccessor) {
        return new FnApiControlClientPoolService(sink, headerAccessor);
    }

    public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
        String sdkWorkerId = this.headerAccessor.getSdkWorkerId();
        if (Strings.isNullOrEmpty(sdkWorkerId)) {
            LOG.warn("No worker_id header provided in control request");
        }
        LOG.info("Beam Fn Control client connected with id {}", sdkWorkerId);
        FnApiControlClient forRequestObserver = FnApiControlClient.forRequestObserver(sdkWorkerId, streamObserver);
        try {
            synchronized (this.lock) {
                Preconditions.checkState(!this.closed, "%s already closed", FnApiControlClientPoolService.class.getSimpleName());
                this.vendedClients.add(forRequestObserver);
            }
            this.clientSink.put(this.headerAccessor.getSdkWorkerId(), forRequestObserver);
            return forRequestObserver.asResponseObserver();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            if (!this.closed) {
                this.closed = true;
                Iterator<FnApiControlClient> it = this.vendedClients.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
            }
        }
    }
}
