package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/logging/GrpcLoggingService.class */
public class GrpcLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase implements FnService {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) GrpcLoggingService.class);
    private final LogWriter logWriter;
    private final ConcurrentMap<InboundObserver, StreamObserver<BeamFnApi.LogControl>> connectedClients = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/logging/GrpcLoggingService$InboundObserver.class */
    private class InboundObserver implements StreamObserver<BeamFnApi.LogEntry.List> {
        private InboundObserver() {
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onNext(BeamFnApi.LogEntry.List list) {
            Iterator<BeamFnApi.LogEntry> it = list.getLogEntriesList().iterator();
            while (it.hasNext()) {
                GrpcLoggingService.this.logWriter.log(it.next());
            }
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            GrpcLoggingService.LOGGER.warn("Logging client failed unexpectedly.", th);
            GrpcLoggingService.this.completeIfNotNull((StreamObserver) GrpcLoggingService.this.connectedClients.remove(this));
        }

        @Override // org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver
        public void onCompleted() {
            GrpcLoggingService.LOGGER.info("Logging client hanged up.");
            GrpcLoggingService.this.completeIfNotNull((StreamObserver) GrpcLoggingService.this.connectedClients.remove(this));
        }
    }

    public static GrpcLoggingService forWriter(LogWriter logWriter) {
        return new GrpcLoggingService(logWriter);
    }

    private GrpcLoggingService(LogWriter logWriter) {
        this.logWriter = logWriter;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
        ImmutableSet copyOf = ImmutableSet.copyOf((Collection) this.connectedClients.keySet());
        if (copyOf.isEmpty()) {
            return;
        }
        LOGGER.info("{} Beam Fn Logging clients still connected during shutdown.", Integer.valueOf(copyOf.size()));
        Iterator<E> it = copyOf.iterator();
        while (it.hasNext()) {
            completeIfNotNull(this.connectedClients.remove((InboundObserver) it.next()));
        }
    }

    @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.BeamFnLoggingImplBase
    public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
        LOGGER.info("Beam Fn Logging client connected.");
        InboundObserver inboundObserver = new InboundObserver();
        this.connectedClients.put(inboundObserver, streamObserver);
        return inboundObserver;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeIfNotNull(StreamObserver<BeamFnApi.LogControl> streamObserver) {
        if (streamObserver != null) {
            try {
                streamObserver.onCompleted();
            } catch (RuntimeException e) {
                LOGGER.warn("Beam Fn Logging client failed to be complete.", (Throwable) e);
            }
        }
    }
}
