package org.apache.beam.runners.fnexecution.control;

import java.io.Closeable;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/FnApiControlClient.class */
public class FnApiControlClient implements Closeable, InstructionRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FnApiControlClient.class);
    private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
    private final String workerId;
    private final ConcurrentMap<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors;
    private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
    private final Set<Consumer<FnApiControlClient>> onCloseListeners = ConcurrentHashMap.newKeySet();
    private AtomicBoolean isClosed = new AtomicBoolean(false);
    private final ConcurrentMap<String, CompletableFuture<BeamFnApi.InstructionResponse>> outstandingRequests = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/FnApiControlClient$ResponseStreamObserver.class */
    private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse> {
        private ResponseStreamObserver() {
        }

        @Override // org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver
        public void onNext(BeamFnApi.InstructionResponse instructionResponse) {
            FnApiControlClient.LOG.debug("Received InstructionResponse {}", instructionResponse);
            CompletableFuture completableFuture = (CompletableFuture) FnApiControlClient.this.outstandingRequests.remove(instructionResponse.getInstructionId());
            if (completableFuture == null) {
                FnApiControlClient.LOG.warn("Dropped unknown InstructionResponse {}", instructionResponse);
            } else if (instructionResponse.getError().isEmpty()) {
                completableFuture.complete(instructionResponse);
            } else {
                completableFuture.completeExceptionally(new RuntimeException(String.format("Error received from SDK harness for instruction %s: %s", instructionResponse.getInstructionId(), instructionResponse.getError())));
            }
        }

        @Override // org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver
        public void onCompleted() {
            FnApiControlClient.this.closeAndTerminateOutstandingRequests(new IllegalStateException("SDK harness closed connection"));
        }

        @Override // org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            FnApiControlClient.LOG.error("{} received an error.", FnApiControlClient.class.getSimpleName(), th);
            FnApiControlClient.this.closeAndTerminateOutstandingRequests(th);
        }
    }

    private FnApiControlClient(String str, StreamObserver<BeamFnApi.InstructionRequest> streamObserver, ConcurrentMap<String, BeamFnApi.ProcessBundleDescriptor> concurrentMap) {
        this.workerId = str;
        this.requestReceiver = SynchronizedStreamObserver.wrapping(streamObserver);
        this.processBundleDescriptors = concurrentMap;
    }

    public static FnApiControlClient forRequestObserver(String str, StreamObserver<BeamFnApi.InstructionRequest> streamObserver, ConcurrentMap<String, BeamFnApi.ProcessBundleDescriptor> concurrentMap) {
        return new FnApiControlClient(str, streamObserver, concurrentMap);
    }

    @Override // org.apache.beam.runners.fnexecution.control.InstructionRequestHandler
    public CompletionStage<BeamFnApi.InstructionResponse> handle(BeamFnApi.InstructionRequest instructionRequest) {
        LOG.debug("Sending InstructionRequest {}", instructionRequest);
        CompletableFuture<BeamFnApi.InstructionResponse> completableFuture = new CompletableFuture<>();
        this.outstandingRequests.put(instructionRequest.getInstructionId(), completableFuture);
        this.requestReceiver.onNext(instructionRequest);
        return completableFuture;
    }

    public StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
        return this.responseObserver;
    }

    public BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor(String str) {
        return this.processBundleDescriptors.get(str);
    }

    @Override // org.apache.beam.runners.fnexecution.control.InstructionRequestHandler
    public void registerProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor) {
        this.processBundleDescriptors.put(processBundleDescriptor.getId(), processBundleDescriptor);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));
    }

    public String getWorkerId() {
        return this.workerId;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAndTerminateOutstandingRequests(Throwable th) {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        try {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(this.outstandingRequests);
            this.outstandingRequests.clear();
            if (concurrentHashMap.isEmpty()) {
                this.requestReceiver.onCompleted();
                Iterator<Consumer<FnApiControlClient>> it = this.onCloseListeners.iterator();
                while (it.hasNext()) {
                    it.next().accept(this);
                }
                return;
            }
            this.requestReceiver.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(th.getMessage())));
            LOG.error("{} closed, clearing outstanding requests {}", FnApiControlClient.class.getSimpleName(), concurrentHashMap);
            Iterator it2 = concurrentHashMap.values().iterator();
            while (it2.hasNext()) {
                ((CompletableFuture) it2.next()).completeExceptionally(th);
            }
        } finally {
            Iterator<Consumer<FnApiControlClient>> it3 = this.onCloseListeners.iterator();
            while (it3.hasNext()) {
                it3.next().accept(this);
            }
        }
    }

    public void onClose(Consumer<FnApiControlClient> consumer) {
        this.onCloseListeners.add(consumer);
    }
}
