package org.apache.beam.runners.fnexecution.control;

import io.grpc.ServerServiceDefinition;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.data.FnDataService;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.class */
public class SdkHarnessClientControlService implements FnService {
    private final Supplier<FnDataService> dataService;
    private final Collection<SdkHarnessClient> activeClients = new ConcurrentLinkedQueue();
    private final BlockingQueue<FnApiControlClient> pendingClients = new SynchronousQueue();
    private final FnApiControlClientPoolService clientPoolService = FnApiControlClientPoolService.offeringClientsToPool(this.pendingClients);

    public static SdkHarnessClientControlService create(Supplier<FnDataService> supplier) {
        return new SdkHarnessClientControlService(supplier);
    }

    private SdkHarnessClientControlService(Supplier<FnDataService> supplier) {
        this.dataService = supplier;
    }

    public SdkHarnessClient getClient() {
        try {
            return SdkHarnessClient.usingFnApiClient(this.pendingClients.take(), this.dataService.get());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for client", e);
        }
    }

    @Override // org.apache.beam.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
        Iterator<SdkHarnessClient> it = this.activeClients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public ServerServiceDefinition bindService() {
        return this.clientPoolService.bindService();
    }
}
