/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.logging;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableSet;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcLoggingService
extends BeamFnLoggingGrpc.BeamFnLoggingImplBase
implements FnService {
    private static final Logger LOGGER = LoggerFactory.getLogger(GrpcLoggingService.class);
    private final LogWriter logWriter;
    private final ConcurrentMap<InboundObserver, StreamObserver<BeamFnApi.LogControl>> connectedClients;

    public static GrpcLoggingService forWriter(LogWriter writer) {
        return new GrpcLoggingService(writer);
    }

    private GrpcLoggingService(LogWriter logWriter) {
        this.logWriter = logWriter;
        this.connectedClients = new ConcurrentHashMap<InboundObserver, StreamObserver<BeamFnApi.LogControl>>();
    }

    @Override
    public void close() throws Exception {
        ImmutableSet<InboundObserver> remainingClients = ImmutableSet.copyOf(this.connectedClients.keySet());
        if (!remainingClients.isEmpty()) {
            LOGGER.info("{} Beam Fn Logging clients still connected during shutdown.", (Object)remainingClients.size());
            for (InboundObserver client : remainingClients) {
                this.completeIfNotNull((StreamObserver<BeamFnApi.LogControl>)((StreamObserver)this.connectedClients.remove(client)));
            }
        }
    }

    public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
        LOGGER.info("Beam Fn Logging client connected.");
        InboundObserver inboundObserver = new InboundObserver();
        this.connectedClients.put(inboundObserver, outboundObserver);
        return inboundObserver;
    }

    private void completeIfNotNull(StreamObserver<BeamFnApi.LogControl> outboundObserver) {
        if (outboundObserver != null) {
            try {
                outboundObserver.onCompleted();
            }
            catch (RuntimeException ignored) {
                LOGGER.warn("Beam Fn Logging client failed to be complete.", (Throwable)ignored);
            }
        }
    }

    private class InboundObserver
    implements StreamObserver<BeamFnApi.LogEntry.List> {
        private InboundObserver() {
        }

        public void onNext(BeamFnApi.LogEntry.List value) {
            for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
                GrpcLoggingService.this.logWriter.log(logEntry);
            }
        }

        public void onError(Throwable t) {
            LOGGER.warn("Logging client failed unexpectedly.", t);
            GrpcLoggingService.this.completeIfNotNull((StreamObserver<BeamFnApi.LogControl>)((StreamObserver)GrpcLoggingService.this.connectedClients.remove(this)));
        }

        public void onCompleted() {
            LOGGER.info("Logging client hanged up.");
            GrpcLoggingService.this.completeIfNotNull((StreamObserver<BeamFnApi.LogControl>)((StreamObserver)GrpcLoggingService.this.connectedClients.remove(this)));
        }
    }
}

