package org.apache.beam.sdk.fn.data;

import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.UnmodifiableIterator;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.class */
public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ConcurrentMap<String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>> receivers = new ConcurrentHashMap();
    private final ConcurrentMap<String, Boolean> erroredInstructionIds = new ConcurrentHashMap();
    private final StreamObserver<BeamFnApi.Elements> inboundObserver = new InboundObserver();

    /* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2$InboundObserver.class */
    private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> {
        private InboundObserver() {
        }

        public void onNext(BeamFnApi.Elements elements) {
            String str = null;
            Iterator<BeamFnApi.Elements.Data> it = elements.getDataList().iterator();
            while (true) {
                if (!it.hasNext()) {
                    for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
                        if (str == null) {
                            str = timers.getInstructionId();
                        } else if (!str.equals(timers.getInstructionId())) {
                        }
                    }
                    if (str == null) {
                        return;
                    }
                    forwardToConsumerForInstructionId(str, elements);
                    return;
                }
                BeamFnApi.Elements.Data next = it.next();
                if (str != null) {
                    if (!str.equals(next.getInstructionId())) {
                        break;
                    }
                } else {
                    str = next.getInstructionId();
                }
            }
            HashSet hashSet = new HashSet();
            Iterator<BeamFnApi.Elements.Data> it2 = elements.getDataList().iterator();
            while (it2.hasNext()) {
                hashSet.add(it2.next().getInstructionId());
            }
            Iterator<BeamFnApi.Elements.Timers> it3 = elements.getTimersList().iterator();
            while (it3.hasNext()) {
                hashSet.add(it3.next().getInstructionId());
            }
            Iterator it4 = hashSet.iterator();
            while (it4.hasNext()) {
                String str2 = (String) it4.next();
                BeamFnApi.Elements.Builder newBuilder = BeamFnApi.Elements.newBuilder();
                for (BeamFnApi.Elements.Data data : elements.getDataList()) {
                    if (str2.equals(data.getInstructionId())) {
                        newBuilder.addData(data);
                    }
                }
                for (BeamFnApi.Elements.Timers timers2 : elements.getTimersList()) {
                    if (str2.equals(timers2.getInstructionId())) {
                        newBuilder.addTimers(timers2);
                    }
                }
                forwardToConsumerForInstructionId(str2, newBuilder.m675build());
            }
        }

        private void forwardToConsumerForInstructionId(String str, BeamFnApi.Elements elements) {
            if (BeamFnDataGrpcMultiplexer2.this.erroredInstructionIds.containsKey(str)) {
                BeamFnDataGrpcMultiplexer2.LOG.debug("Ignoring inbound data for failed instruction {}", str);
                return;
            }
            CompletableFuture receiverFuture = BeamFnDataGrpcMultiplexer2.this.receiverFuture(str);
            if (!receiverFuture.isDone()) {
                BeamFnDataGrpcMultiplexer2.LOG.debug("Received data for instruction {} without consumer ready. Waiting for consumer to be registered.", str);
            }
            try {
                try {
                    ((CloseableFnDataReceiver) receiverFuture.get()).accept(elements);
                } catch (Exception e) {
                    BeamFnDataGrpcMultiplexer2.this.erroredInstructionIds.put(str, true);
                }
            } catch (InterruptedException | ExecutionException e2) {
                BeamFnDataGrpcMultiplexer2.LOG.error("Client interrupted during handling of data for instruction {}", str, e2);
                BeamFnDataGrpcMultiplexer2.this.outboundObserver.onError(e2);
            } catch (RuntimeException e3) {
                BeamFnDataGrpcMultiplexer2.LOG.error("Client failed to handle data for instruction {}", str, e3);
                BeamFnDataGrpcMultiplexer2.this.outboundObserver.onError(e3);
            }
        }

        public void onError(Throwable th) {
            BeamFnDataGrpcMultiplexer2.LOG.error("Failed to handle for {}", BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor == null ? "unknown endpoint" : BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor, th);
            BeamFnDataGrpcMultiplexer2.this.outboundObserver.onCompleted();
        }

        public void onCompleted() {
            BeamFnDataGrpcMultiplexer2.LOG.warn("Hanged up for {}.", BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor == null ? "unknown endpoint" : BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor);
            BeamFnDataGrpcMultiplexer2.this.outboundObserver.onCompleted();
        }
    }

    public BeamFnDataGrpcMultiplexer2(Endpoints.ApiServiceDescriptor apiServiceDescriptor, OutboundObserverFactory outboundObserverFactory, OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, BeamFnApi.Elements> basicFactory) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.outboundObserver = outboundObserverFactory.outboundObserverFor(basicFactory, this.inboundObserver);
    }

    @SideEffectFree
    public String toString() {
        return MoreObjects.toStringHelper(this).omitNullValues().add("apiServiceDescriptor", this.apiServiceDescriptor).add("consumers", this.receivers).toString();
    }

    public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
        return this.inboundObserver;
    }

    public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
        return this.outboundObserver;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(String str) {
        return this.receivers.computeIfAbsent(str, str2 -> {
            return new CompletableFuture();
        });
    }

    public void registerConsumer(String str, CloseableFnDataReceiver<BeamFnApi.Elements> closeableFnDataReceiver) {
        receiverFuture(str).complete(closeableFnDataReceiver);
    }

    public void unregisterConsumer(String str) {
        this.receivers.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public boolean hasConsumer(String str) {
        return this.receivers.containsKey(str);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Exception exc = null;
        UnmodifiableIterator it = ImmutableList.copyOf(this.receivers.values()).iterator();
        while (it.hasNext()) {
            CompletableFuture completableFuture = (CompletableFuture) it.next();
            completableFuture.cancel(true);
            if (!completableFuture.isCompletedExceptionally()) {
                try {
                    ((CloseableFnDataReceiver) completableFuture.get()).close();
                } catch (Exception e) {
                    if (exc == null) {
                        exc = e;
                    } else {
                        exc.addSuppressed(e);
                    }
                }
            }
        }
        this.outboundObserver.onError(Status.CANCELLED.withDescription("Multiplexer hanging up").asException());
        this.inboundObserver.onCompleted();
        if (exc != null) {
            throw exc;
        }
    }
}
