package org.apache.beam.fn.harness.logging;

import com.google.protobuf.Timestamp;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Throwables;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableMap;

/* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient.class */
public class BeamFnLoggingClient implements AutoCloseable {
    private static final String ROOT_LOGGER_NAME = "";
    private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity> LOG_LEVEL_MAP = ImmutableMap.builder().put(Level.SEVERE, BeamFnApi.LogEntry.Severity.ERROR).put(Level.WARNING, BeamFnApi.LogEntry.Severity.WARN).put(Level.INFO, BeamFnApi.LogEntry.Severity.INFO).put(Level.FINE, BeamFnApi.LogEntry.Severity.DEBUG).put(Level.FINEST, BeamFnApi.LogEntry.Severity.TRACE).build();
    private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION = ImmutableMap.builder().put(DataflowWorkerLoggingOptions.Level.OFF, Level.OFF).put(DataflowWorkerLoggingOptions.Level.ERROR, Level.SEVERE).put(DataflowWorkerLoggingOptions.Level.WARN, Level.WARNING).put(DataflowWorkerLoggingOptions.Level.INFO, Level.INFO).put(DataflowWorkerLoggingOptions.Level.DEBUG, Level.FINE).put(DataflowWorkerLoggingOptions.Level.TRACE, Level.FINEST).build();
    private static final Formatter FORMATTER = new SimpleFormatter();
    private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
    private static final BeamFnApi.LogEntry POISON_PILL = BeamFnApi.LogEntry.newBuilder().setInstructionReference(FAKE_INSTRUCTION_ID).build();
    private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10000;
    private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
    private final ManagedChannel channel;
    private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
    private final LogControlObserver inboundObserver;
    private final LogRecordHandler logRecordHandler;
    private final CompletableFuture<Object> inboundObserverCompletion = new CompletableFuture<>();
    private final Collection<Logger> configuredLoggers = new ArrayList();

    /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$LogControlObserver.class */
    private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> {
        private LogControlObserver() {
        }

        public void onNext(BeamFnApi.LogControl logControl) {
        }

        public void onError(Throwable th) {
            BeamFnLoggingClient.this.inboundObserverCompletion.completeExceptionally(th);
        }

        public void onCompleted() {
            BeamFnLoggingClient.this.inboundObserverCompletion.complete(null);
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$LogRecordHandler.class */
    private class LogRecordHandler extends Handler implements Runnable {
        private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries;
        private final Future<?> bufferedLogWriter;
        private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler;

        private LogRecordHandler(ExecutorService executorService) {
            this.bufferedLogEntries = new LinkedBlockingDeque(BeamFnLoggingClient.MAX_BUFFERED_LOG_ENTRY_COUNT);
            this.bufferedLogWriter = executorService.submit(this);
            this.logEntryHandler = new ThreadLocal<>();
        }

        @Override // java.util.logging.Handler
        public void publish(LogRecord logRecord) {
            BeamFnApi.LogEntry.Severity severity = (BeamFnApi.LogEntry.Severity) BeamFnLoggingClient.LOG_LEVEL_MAP.get(logRecord.getLevel());
            if (severity == null) {
                return;
            }
            BeamFnApi.LogEntry.Builder timestamp = BeamFnApi.LogEntry.newBuilder().setSeverity(severity).setLogLocation(logRecord.getLoggerName()).setMessage(BeamFnLoggingClient.FORMATTER.formatMessage(logRecord)).setThread(Integer.toString(logRecord.getThreadID())).setTimestamp(Timestamp.newBuilder().setSeconds(logRecord.getMillis() / 1000).setNanos(((int) (logRecord.getMillis() % 1000)) * 1000000));
            if (logRecord.getThrown() != null) {
                timestamp.setTrace(Throwables.getStackTraceAsString(logRecord.getThrown()));
            }
            ((Consumer) MoreObjects.firstNonNull(this.logEntryHandler.get(), this::blockingPublish)).accept(timestamp.build());
        }

        private void blockingPublish(BeamFnApi.LogEntry logEntry) {
            try {
                this.bufferedLogEntries.put(logEntry);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v24, types: [java.util.List] */
        @Override // java.lang.Runnable
        public void run() {
            ThreadLocal<Consumer<BeamFnApi.LogEntry>> threadLocal = this.logEntryHandler;
            BlockingDeque<BeamFnApi.LogEntry> blockingDeque = this.bufferedLogEntries;
            blockingDeque.getClass();
            threadLocal.set((v1) -> {
                r1.offer(v1);
            });
            ArrayList arrayList = new ArrayList(BeamFnLoggingClient.MAX_BUFFERED_LOG_ENTRY_COUNT);
            while (true) {
                try {
                    BeamFnApi.LogEntry take = this.bufferedLogEntries.take();
                    if (take == BeamFnLoggingClient.POISON_PILL) {
                        return;
                    }
                    BeamFnApi.LogEntry.List.Builder addLogEntries = BeamFnApi.LogEntry.List.newBuilder().addLogEntries(take);
                    this.bufferedLogEntries.drainTo(arrayList);
                    int i = 0;
                    while (true) {
                        if (i >= arrayList.size()) {
                            break;
                        }
                        if (arrayList.get(i) == BeamFnLoggingClient.POISON_PILL) {
                            arrayList = arrayList.subList(0, i);
                            break;
                        }
                        i++;
                    }
                    addLogEntries.addAllLogEntries(arrayList);
                    BeamFnLoggingClient.this.outboundObserver.onNext(addLogEntries.build());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException(e);
                }
            }
        }

        @Override // java.util.logging.Handler
        public void flush() {
        }

        @Override // java.util.logging.Handler
        public void close() {
            synchronized (BeamFnLoggingClient.this.outboundObserver) {
                if (!this.bufferedLogWriter.isDone()) {
                    while (true) {
                        try {
                            if (this.bufferedLogEntries.offer(BeamFnLoggingClient.POISON_PILL, 60L, TimeUnit.SECONDS) && this.bufferedLogWriter.isDone()) {
                                break;
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException(e);
                        }
                    }
                    waitTillFinish();
                }
                BeamFnLoggingClient.this.outboundObserver.onCompleted();
            }
        }

        private void waitTillFinish() {
            try {
                this.bufferedLogWriter.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (CancellationException e2) {
            } catch (ExecutionException e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    public BeamFnLoggingClient(PipelineOptions pipelineOptions, BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> function, BiFunction<Function<StreamObserver<BeamFnApi.LogControl>, StreamObserver<BeamFnApi.LogEntry.List>>, StreamObserver<BeamFnApi.LogControl>, StreamObserver<BeamFnApi.LogEntry.List>> biFunction) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.channel = function.apply(apiServiceDescriptor);
        LogManager logManager = LogManager.getLogManager();
        logManager.reset();
        Logger logger = logManager.getLogger(ROOT_LOGGER_NAME);
        for (Handler handler : logger.getHandlers()) {
            logger.removeHandler(handler);
        }
        DataflowWorkerLoggingOptions as = pipelineOptions.as(DataflowWorkerLoggingOptions.class);
        if (as.getDefaultWorkerLogLevel() != null) {
            logger.setLevel(LEVEL_CONFIGURATION.get(as.getDefaultWorkerLogLevel()));
        }
        if (as.getWorkerLogLevelOverrides() != null) {
            for (Map.Entry entry : as.getWorkerLogLevelOverrides().entrySet()) {
                Logger logger2 = Logger.getLogger((String) entry.getKey());
                logger2.setLevel(LEVEL_CONFIGURATION.get(entry.getValue()));
                this.configuredLoggers.add(logger2);
            }
        }
        BeamFnLoggingGrpc.BeamFnLoggingStub newStub = BeamFnLoggingGrpc.newStub(this.channel);
        this.inboundObserver = new LogControlObserver();
        this.logRecordHandler = new LogRecordHandler(pipelineOptions.as(GcsOptions.class).getExecutorService());
        this.logRecordHandler.setLevel(Level.ALL);
        newStub.getClass();
        this.outboundObserver = biFunction.apply(newStub::logging, this.inboundObserver);
        logger.addHandler(this.logRecordHandler);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.logRecordHandler.close();
        this.inboundObserverCompletion.get();
        Iterator<Logger> it = this.configuredLoggers.iterator();
        while (it.hasNext()) {
            it.next().setLevel(null);
        }
        this.configuredLoggers.clear();
        LogManager.getLogManager().readConfiguration();
        this.channel.shutdown();
        if (this.channel.awaitTermination(10L, TimeUnit.SECONDS)) {
            return;
        }
        this.channel.shutdownNow();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Class<?>) BeamFnLoggingClient.class).add("apiServiceDescriptor", this.apiServiceDescriptor).toString();
    }
}
