package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.class */
public class BeamFnStateGrpcClientCache {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnStateGrpcClientCache.class);
    private final Map<Endpoints.ApiServiceDescriptor, BeamFnStateClient> cache = new HashMap();
    private final ManagedChannelFactory channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final IdGenerator idGenerator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache$GrpcStateClient.class */
    public class GrpcStateClient implements BeamFnStateClient {
        private final Object lock;
        private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
        private final Map<String, CompletableFuture<BeamFnApi.StateResponse>> outstandingRequests;
        private final StreamObserver<BeamFnApi.StateRequest> outboundObserver;
        private final ManagedChannel channel;
        private RuntimeException closed;
        private boolean errorDuringConstruction;

        /* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache$GrpcStateClient$InboundObserver.class */
        private class InboundObserver implements StreamObserver<BeamFnApi.StateResponse> {
            private InboundObserver() {
            }

            @Override // org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver
            public void onNext(BeamFnApi.StateResponse stateResponse) {
                CompletableFuture completableFuture;
                BeamFnStateGrpcClientCache.LOG.debug("Received StateResponse {}", stateResponse);
                synchronized (GrpcStateClient.this.lock) {
                    completableFuture = (CompletableFuture) GrpcStateClient.this.outstandingRequests.remove(stateResponse.getId());
                }
                if (completableFuture == null) {
                    BeamFnStateGrpcClientCache.LOG.warn("Dropped unknown StateResponse {}", stateResponse);
                } else if (stateResponse.getError().isEmpty()) {
                    completableFuture.complete(stateResponse);
                } else {
                    completableFuture.completeExceptionally(new IllegalStateException(stateResponse.getError()));
                }
            }

            @Override // org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                GrpcStateClient.this.closeAndCleanUp(th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException(th));
            }

            @Override // org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver
            public void onCompleted() {
                GrpcStateClient.this.closeAndCleanUp(new RuntimeException("Server hanged up."));
            }
        }

        private GrpcStateClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
            this.lock = new Object();
            this.apiServiceDescriptor = apiServiceDescriptor;
            this.outstandingRequests = new HashMap();
            this.channel = BeamFnStateGrpcClientCache.this.channelFactory.forDescriptor(apiServiceDescriptor);
            this.errorDuringConstruction = false;
            OutboundObserverFactory outboundObserverFactory = BeamFnStateGrpcClientCache.this.outboundObserverFactory;
            BeamFnStateGrpc.BeamFnStateStub newStub = BeamFnStateGrpc.newStub(this.channel);
            Objects.requireNonNull(newStub);
            this.outboundObserver = outboundObserverFactory.outboundObserverFor(newStub::state, new InboundObserver());
            synchronized (this.lock) {
                if (this.errorDuringConstruction) {
                    this.outboundObserver.onCompleted();
                }
            }
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder builder) {
            builder.setId(BeamFnStateGrpcClientCache.this.idGenerator.getId());
            BeamFnApi.StateRequest build = builder.build();
            CompletableFuture<BeamFnApi.StateResponse> completableFuture = new CompletableFuture<>();
            synchronized (this.lock) {
                if (this.closed != null) {
                    completableFuture.completeExceptionally(this.closed);
                    return completableFuture;
                }
                this.outstandingRequests.put(build.getId(), completableFuture);
                BeamFnStateGrpcClientCache.LOG.debug("Sending StateRequest {}", build);
                this.outboundObserver.onNext(build);
                return completableFuture;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeAndCleanUp(RuntimeException runtimeException) {
            synchronized (this.lock) {
                if (this.closed != null) {
                    return;
                }
                this.closed = runtimeException;
                synchronized (BeamFnStateGrpcClientCache.this.cache) {
                    BeamFnStateGrpcClientCache.this.cache.remove(this.apiServiceDescriptor);
                }
                if (!this.outstandingRequests.isEmpty()) {
                    BeamFnStateGrpcClientCache.LOG.error("BeamFnState failed, clearing outstanding requests {}", this.outstandingRequests);
                    Iterator<CompletableFuture<BeamFnApi.StateResponse>> it = this.outstandingRequests.values().iterator();
                    while (it.hasNext()) {
                        it.next().completeExceptionally(runtimeException);
                    }
                    this.outstandingRequests.clear();
                }
                if (this.outboundObserver == null) {
                    this.errorDuringConstruction = true;
                } else {
                    this.outboundObserver.onCompleted();
                }
            }
        }
    }

    public BeamFnStateGrpcClientCache(IdGenerator idGenerator, ManagedChannelFactory managedChannelFactory, OutboundObserverFactory outboundObserverFactory) {
        this.idGenerator = idGenerator;
        this.channelFactory = managedChannelFactory.withDirectExecutor();
        this.outboundObserverFactory = outboundObserverFactory;
    }

    public synchronized BeamFnStateClient forApiServiceDescriptor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) throws IOException {
        BeamFnStateClient beamFnStateClient;
        synchronized (this.cache) {
            beamFnStateClient = this.cache.get(apiServiceDescriptor);
        }
        if (beamFnStateClient == null) {
            beamFnStateClient = new GrpcStateClient(apiServiceDescriptor);
            synchronized (this.cache) {
                this.cache.put(apiServiceDescriptor, beamFnStateClient);
            }
        }
        return beamFnStateClient;
    }
}
