package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.class */
public class BeamFnStateGrpcClientCache {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnDataGrpcClient.class);
    private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnStateClient> cache = new ConcurrentHashMap();
    private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final IdGenerator idGenerator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache$GrpcStateClient.class */
    public class GrpcStateClient implements BeamFnStateClient {
        private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
        private final ConcurrentMap<String, CompletableFuture<BeamFnApi.StateResponse>> outstandingRequests;
        private final StreamObserver<BeamFnApi.StateRequest> outboundObserver;
        private final ManagedChannel channel;
        private volatile RuntimeException closed;

        /* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache$GrpcStateClient$InboundObserver.class */
        private class InboundObserver implements StreamObserver<BeamFnApi.StateResponse> {
            private InboundObserver() {
            }

            @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
            public void onNext(BeamFnApi.StateResponse stateResponse) {
                BeamFnStateGrpcClientCache.LOG.debug("Received StateResponse {}", stateResponse);
                CompletableFuture completableFuture = (CompletableFuture) GrpcStateClient.this.outstandingRequests.remove(stateResponse.getId());
                if (completableFuture != null) {
                    if (stateResponse.getError().isEmpty()) {
                        completableFuture.complete(stateResponse);
                    } else {
                        completableFuture.completeExceptionally(new IllegalStateException(stateResponse.getError()));
                    }
                }
            }

            @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                GrpcStateClient.this.closeAndCleanUp(th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException(th));
            }

            @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
            public void onCompleted() {
                GrpcStateClient.this.closeAndCleanUp(new RuntimeException("Server hanged up."));
            }
        }

        private GrpcStateClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
            this.apiServiceDescriptor = apiServiceDescriptor;
            this.outstandingRequests = new ConcurrentHashMap();
            this.channel = (ManagedChannel) BeamFnStateGrpcClientCache.this.channelFactory.apply(apiServiceDescriptor);
            OutboundObserverFactory outboundObserverFactory = BeamFnStateGrpcClientCache.this.outboundObserverFactory;
            BeamFnStateGrpc.BeamFnStateStub newStub = BeamFnStateGrpc.newStub(this.channel);
            Objects.requireNonNull(newStub);
            this.outboundObserver = outboundObserverFactory.outboundObserverFor(newStub::state, new InboundObserver());
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public void handle(BeamFnApi.StateRequest.Builder builder, CompletableFuture<BeamFnApi.StateResponse> completableFuture) {
            builder.setId(BeamFnStateGrpcClientCache.this.idGenerator.getId());
            BeamFnApi.StateRequest build = builder.build();
            this.outstandingRequests.put(build.getId(), completableFuture);
            BeamFnStateGrpcClientCache.LOG.debug("Sending StateRequest {}", build);
            this.outboundObserver.onNext(build);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void closeAndCleanUp(RuntimeException runtimeException) {
            if (this.closed != null) {
                return;
            }
            BeamFnStateGrpcClientCache.this.cache.remove(this.apiServiceDescriptor);
            this.closed = runtimeException;
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(this.outstandingRequests);
            if (concurrentHashMap.isEmpty()) {
                this.outboundObserver.onCompleted();
                return;
            }
            this.outstandingRequests.clear();
            BeamFnStateGrpcClientCache.LOG.error("BeamFnState failed, clearing outstanding requests {}", concurrentHashMap);
            Iterator it = concurrentHashMap.values().iterator();
            while (it.hasNext()) {
                ((CompletableFuture) it.next()).completeExceptionally(runtimeException);
            }
        }
    }

    public BeamFnStateGrpcClientCache(IdGenerator idGenerator, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> function, OutboundObserverFactory outboundObserverFactory) {
        this.idGenerator = idGenerator;
        this.channelFactory = function;
        this.outboundObserverFactory = outboundObserverFactory;
    }

    public BeamFnStateClient forApiServiceDescriptor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) throws IOException {
        return this.cache.computeIfAbsent(apiServiceDescriptor, this::createBeamFnStateClient);
    }

    private BeamFnStateClient createBeamFnStateClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        return new GrpcStateClient(apiServiceDescriptor);
    }
}
