package org.apache.beam.fn.harness.logging;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientResponseObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.dataflow.qual.SideEffectFree;

/* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient.class */
public class BeamFnLoggingClient implements AutoCloseable {
    private static final String ROOT_LOGGER_NAME = "";
    private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10000;
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final ManagedChannel channel;
    private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
    private final LogControlObserver inboundObserver;
    private final LogRecordHandler logRecordHandler;
    private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity.Enum> LOG_LEVEL_MAP = ImmutableMap.builder().put(Level.SEVERE, BeamFnApi.LogEntry.Severity.Enum.ERROR).put(Level.WARNING, BeamFnApi.LogEntry.Severity.Enum.WARN).put(Level.INFO, BeamFnApi.LogEntry.Severity.Enum.INFO).put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG).put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE).build();
    private static final ImmutableMap<SdkHarnessOptions.LogLevel, Level> LEVEL_CONFIGURATION = ImmutableMap.builder().put(SdkHarnessOptions.LogLevel.OFF, Level.OFF).put(SdkHarnessOptions.LogLevel.ERROR, Level.SEVERE).put(SdkHarnessOptions.LogLevel.WARN, Level.WARNING).put(SdkHarnessOptions.LogLevel.INFO, Level.INFO).put(SdkHarnessOptions.LogLevel.DEBUG, Level.FINE).put(SdkHarnessOptions.LogLevel.TRACE, Level.FINEST).build();
    private static final Formatter FORMATTER = new SimpleFormatter();
    private static final Object COMPLETED = new Object();
    private final CompletableFuture<Object> inboundObserverCompletion = new CompletableFuture<>();
    private final Collection<Logger> configuredLoggers = new ArrayList();
    private final Phaser phaser = new Phaser(1);

    /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$LogControlObserver.class */
    private class LogControlObserver implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> {
        private LogControlObserver() {
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientResponseObserver
        public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry> clientCallStreamObserver) {
            Phaser phaser = BeamFnLoggingClient.this.phaser;
            Objects.requireNonNull(phaser);
            clientCallStreamObserver.setOnReadyHandler(phaser::arrive);
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver
        public void onNext(BeamFnApi.LogControl logControl) {
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            BeamFnLoggingClient.this.inboundObserverCompletion.completeExceptionally(th);
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver
        public void onCompleted() {
            BeamFnLoggingClient.this.inboundObserverCompletion.complete(BeamFnLoggingClient.COMPLETED);
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$LogRecordHandler.class */
    private class LogRecordHandler extends Handler implements Runnable {
        private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries;
        private final Future<?> bufferedLogWriter;
        private Thread logEntryHandlerThread;

        private LogRecordHandler(ExecutorService executorService) {
            this.bufferedLogEntries = new LinkedBlockingDeque(10000);
            this.bufferedLogWriter = executorService.submit(this);
        }

        @Override // java.util.logging.Handler
        public void publish(LogRecord logRecord) {
            BeamFnApi.LogEntry.Severity.Enum r0 = (BeamFnApi.LogEntry.Severity.Enum) BeamFnLoggingClient.LOG_LEVEL_MAP.get(logRecord.getLevel());
            if (r0 == null) {
                return;
            }
            BeamFnApi.LogEntry.Builder timestamp = BeamFnApi.LogEntry.newBuilder().setSeverity(r0).setLogLocation(logRecord.getLoggerName()).setMessage(BeamFnLoggingClient.FORMATTER.formatMessage(logRecord)).setThread(Integer.toString(logRecord.getThreadID())).setTimestamp(Timestamp.newBuilder().setSeconds(logRecord.getMillis() / 1000).setNanos(((int) (logRecord.getMillis() % 1000)) * 1000000));
            if (logRecord.getThrown() != null) {
                timestamp.setTrace(Throwables.getStackTraceAsString(logRecord.getThrown()));
            }
            if (Thread.currentThread() == this.logEntryHandlerThread) {
                this.bufferedLogEntries.offer(timestamp.build());
                return;
            }
            try {
                this.bufferedLogEntries.put(timestamp.build());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            this.logEntryHandlerThread = Thread.currentThread();
            ArrayList arrayList = new ArrayList(10000);
            Throwable th = null;
            while (!BeamFnLoggingClient.this.phaser.isTerminated()) {
                try {
                    BeamFnApi.LogEntry poll = this.bufferedLogEntries.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        int phase = BeamFnLoggingClient.this.phaser.getPhase();
                        if (!BeamFnLoggingClient.this.outboundObserver.isReady()) {
                            BeamFnLoggingClient.this.phaser.awaitAdvance(phase);
                        }
                        BeamFnApi.LogEntry.List.Builder addLogEntries = BeamFnApi.LogEntry.List.newBuilder().addLogEntries(poll);
                        this.bufferedLogEntries.drainTo(arrayList);
                        addLogEntries.addAllLogEntries(arrayList);
                        BeamFnLoggingClient.this.outboundObserver.onNext(addLogEntries.build());
                        arrayList.clear();
                    }
                } catch (Throwable th2) {
                    th = th2;
                }
            }
            this.bufferedLogEntries.drainTo(arrayList);
            if (!arrayList.isEmpty()) {
                BeamFnLoggingClient.this.outboundObserver.onNext(BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(arrayList).build());
            }
            if (th != null) {
                BeamFnLoggingClient.this.outboundObserver.onError(Status.INTERNAL.withDescription(Throwables.getStackTraceAsString(th)).asException());
                throw new IllegalStateException(th);
            }
            BeamFnLoggingClient.this.outboundObserver.onCompleted();
        }

        @Override // java.util.logging.Handler
        public void flush() {
        }

        @Override // java.util.logging.Handler
        public synchronized void close() {
            if (BeamFnLoggingClient.this.phaser.isTerminated()) {
                return;
            }
            BeamFnLoggingClient.this.phaser.forceTermination();
            try {
                this.bufferedLogWriter.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (CancellationException e2) {
            } catch (ExecutionException e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    public BeamFnLoggingClient(PipelineOptions pipelineOptions, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> function) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.channel = function.apply(apiServiceDescriptor);
        LogManager logManager = LogManager.getLogManager();
        logManager.reset();
        Logger logger = logManager.getLogger("");
        for (Handler handler : logger.getHandlers()) {
            logger.removeHandler(handler);
        }
        SdkHarnessOptions sdkHarnessOptions = (SdkHarnessOptions) pipelineOptions.as(SdkHarnessOptions.class);
        if (sdkHarnessOptions.getDefaultSdkHarnessLogLevel() != null) {
            logger.setLevel(LEVEL_CONFIGURATION.get(sdkHarnessOptions.getDefaultSdkHarnessLogLevel()));
        }
        if (sdkHarnessOptions.getSdkHarnessLogLevelOverrides() != null) {
            for (Map.Entry<String, SdkHarnessOptions.LogLevel> entry : sdkHarnessOptions.getSdkHarnessLogLevelOverrides().entrySet()) {
                Logger logger2 = Logger.getLogger(entry.getKey());
                logger2.setLevel(LEVEL_CONFIGURATION.get(entry.getValue()));
                this.configuredLoggers.add(logger2);
            }
        }
        BeamFnLoggingGrpc.BeamFnLoggingStub newStub = BeamFnLoggingGrpc.newStub(this.channel);
        this.inboundObserver = new LogControlObserver();
        this.logRecordHandler = new LogRecordHandler(((GcsOptions) pipelineOptions.as(GcsOptions.class)).getExecutorService());
        this.logRecordHandler.setLevel(Level.ALL);
        this.outboundObserver = (CallStreamObserver) newStub.logging(this.inboundObserver);
        logger.addHandler(this.logRecordHandler);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            Iterator<Logger> it = this.configuredLoggers.iterator();
            while (it.hasNext()) {
                it.next().setLevel(null);
            }
            this.configuredLoggers.clear();
            LogManager.getLogManager().readConfiguration();
            this.logRecordHandler.close();
            this.inboundObserverCompletion.get();
        } finally {
            this.channel.shutdown();
            if (!this.channel.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.channel.shutdownNow();
            }
        }
    }

    @SideEffectFree
    public String toString() {
        return MoreObjects.toStringHelper((Class<?>) BeamFnLoggingClient.class).add("apiServiceDescriptor", this.apiServiceDescriptor).toString();
    }
}
