package org.apache.beam.fn.harness.logging;

import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientCall;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingClientCall;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.MethodDescriptor;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.ProcessIdUtil;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.MDC;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.class */
public class BeamFnLoggingClientTest {
    private static final LogRecord FILTERED_RECORD = new LogRecord(Level.SEVERE, "FilteredMessage");
    private static final LogRecord TEST_RECORD = new LogRecord(Level.FINE, "Message");
    private static final LogRecord TEST_RECORD_WITH_EXCEPTION;
    private static final BeamFnApi.LogEntry TEST_ENTRY;
    private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_CUSTOM_FORMATTER;
    private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION;

    @Rule
    public TestRule restoreLogging = new RestoreBeamFnLoggingMDC();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testLogging() throws Exception {
        BeamFnLoggingMDC.setInstructionId("instruction-1");
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
            ((StreamObserver) atomicReference.get()).onCompleted();
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.AsyncService
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            BeamFnLoggingClient createAndStart = BeamFnLoggingClient.createAndStart(PipelineOptionsFactory.fromArgs(new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            Logger logger = LogManager.getLogManager().getLogger("");
            Logger logger2 = LogManager.getLogManager().getLogger("ConfiguredLogger");
            Assert.assertEquals(Level.OFF, logger.getLevel());
            Assert.assertEquals(Level.FINE, logger2.getLevel());
            logger.log(FILTERED_RECORD);
            logger2.log(TEST_RECORD);
            logger2.log(TEST_RECORD_WITH_EXCEPTION);
            for (Handler handler : logger.getHandlers()) {
                handler.setFormatter(new SimpleFormatter() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.2
                    @Override // java.util.logging.Formatter
                    public synchronized String formatMessage(LogRecord logRecord) {
                        return MDC.get("testMdcKey") + ParameterizedMessage.ERROR_MSG_SEPARATOR + super.formatMessage(logRecord);
                    }
                });
            }
            MDC.put("testMdcKey", "testMdcValue");
            logger2.log(TEST_RECORD);
            createAndStart.close();
            Assert.assertEquals(Level.INFO, logger.getLevel());
            Assert.assertNull(logger2.getLevel());
            Assert.assertTrue(atomicBoolean.get());
            Assert.assertTrue(build4.isShutdown());
            MatcherAssert.assertThat(concurrentLinkedQueue, (Matcher<? super ConcurrentLinkedQueue>) Matchers.contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION, TEST_ENTRY_WITH_CUSTOM_FORMATTER));
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
        BeamFnLoggingMDC.setInstructionId("instruction-1");
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).build();
        Logger logger = null;
        Logger logger2 = null;
        final Phaser phaser = new Phaser(1);
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.3
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.AsyncService
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                phaser.awaitAdvance(1);
                atomicReference.set(streamObserver);
                streamObserver.onError(Status.INTERNAL.withDescription("TEST ERROR").asException());
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            BeamFnLoggingClient createAndStart = BeamFnLoggingClient.createAndStart(PipelineOptionsFactory.fromArgs(new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            logger = LogManager.getLogManager().getLogger("");
            logger2 = LogManager.getLogManager().getLogger("ConfiguredLogger");
            Assert.assertEquals(0L, phaser.arrive());
            this.thrown.expectMessage("TEST ERROR");
            createAndStart.close();
            Assert.assertNotNull("rootLogger should be initialized before exception", logger);
            Assert.assertNotNull("configuredLogger should be initialized before exception", logger2);
            Assert.assertEquals(Level.INFO, logger.getLevel());
            Assert.assertNull(logger2.getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
        } catch (Throwable th) {
            Assert.assertNotNull("rootLogger should be initialized before exception", logger);
            Assert.assertNotNull("configuredLogger should be initialized before exception", logger2);
            Assert.assertEquals(Level.INFO, logger.getLevel());
            Assert.assertNull(logger2.getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testWhenServerHangsUpEarlyThatClientIsAbleCleanup() throws Exception {
        BeamFnLoggingMDC.setInstructionId("instruction-1");
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.4
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.AsyncService
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                streamObserver.onCompleted();
                return build;
            }
        }).build();
        build3.start();
        this.thrown.expectMessage("Logging stream terminated unexpectedly");
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            BeamFnLoggingClient createAndStart = BeamFnLoggingClient.createAndStart(PipelineOptionsFactory.fromArgs(new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            Logger logger = LogManager.getLogManager().getLogger("");
            Logger logger2 = LogManager.getLogManager().getLogger("ConfiguredLogger");
            createAndStart.close();
            Assert.assertEquals(Level.INFO, logger.getLevel());
            Assert.assertNull(logger2.getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
        } catch (Throwable th) {
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testClosableWhenBlockingForOnReady() throws Exception {
        BeamFnLoggingMDC.setInstructionId("instruction-1");
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.5
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.AsyncService
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).intercept(new ClientInterceptor[]{new ClientInterceptor() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.6
            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
                return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.6.1
                    public boolean isReady() {
                        return atomicBoolean.get();
                    }
                };
            }
        }}).build();
        try {
            BeamFnLoggingClient createAndStart = BeamFnLoggingClient.createAndStart(PipelineOptionsFactory.fromArgs(new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            Logger logger = LogManager.getLogManager().getLogger("");
            Logger logger2 = LogManager.getLogManager().getLogger("ConfiguredLogger");
            for (int i = 0; i < 2000; i++) {
                logger2.log(TEST_RECORD);
            }
            int i2 = 0;
            while (concurrentLinkedQueue.size() < 2000) {
                i2++;
                Thread.sleep(1L);
            }
            atomicBoolean.set(false);
            for (int i3 = 0; i3 < 2000; i3++) {
                logger2.log(TEST_RECORD);
            }
            Thread.sleep(i2 * 3);
            Assert.assertTrue(((long) concurrentLinkedQueue.size()) < 2000 * 2);
            createAndStart.close();
            Assert.assertNotNull("rootLogger should be initialized before exception", logger);
            Assert.assertNotNull("configuredLogger should be initialized before exception", logger);
            Assert.assertEquals(Level.INFO, logger.getLevel());
            Assert.assertNull(logger2.getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testServerCloseNotifiesTermination() throws Exception {
        BeamFnLoggingMDC.setInstructionId("instruction-1");
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest.7
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.AsyncService
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                streamObserver.onCompleted();
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        try {
            BeamFnLoggingClient createAndStart = BeamFnLoggingClient.createAndStart(PipelineOptionsFactory.fromArgs(new String[]{"--defaultSdkHarnessLogLevel=OFF", "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"}).create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            this.thrown.expectMessage("Logging stream terminated unexpectedly");
            createAndStart.terminationFuture().get();
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
        } catch (Throwable th) {
            Assert.assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
            Assert.assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
            Assert.assertTrue(build4.isShutdown());
            build3.shutdownNow();
            throw th;
        }
    }

    static {
        TEST_RECORD.setLoggerName("LoggerName");
        TEST_RECORD.setMillis(1234567890L);
        TEST_RECORD.setThreadID(12345);
        TEST_RECORD_WITH_EXCEPTION = new LogRecord(Level.WARNING, "MessageWithException");
        TEST_RECORD_WITH_EXCEPTION.setLoggerName("LoggerName");
        TEST_RECORD_WITH_EXCEPTION.setMillis(1234567890L);
        TEST_RECORD_WITH_EXCEPTION.setThreadID(12345);
        TEST_RECORD_WITH_EXCEPTION.setThrown(new RuntimeException("ExceptionMessage"));
        TEST_ENTRY = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction-1").setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG).setMessage("Message").setThread("12345").setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").m1221build();
        TEST_ENTRY_WITH_CUSTOM_FORMATTER = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction-1").setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG).setMessage("testMdcValue:Message").setCustomData(Struct.newBuilder().putFields("testMdcKey", Value.newBuilder().setStringValue("testMdcValue").build())).setThread("12345").setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").m1221build();
        TEST_ENTRY_WITH_EXCEPTION = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction-1").setSeverity(BeamFnApi.LogEntry.Severity.Enum.WARN).setMessage("MessageWithException").setTrace(Throwables.getStackTraceAsString(TEST_RECORD_WITH_EXCEPTION.getThrown())).setThread("12345").setTimestamp(Timestamp.newBuilder().setSeconds(1234567L).setNanos(890000000).build()).setLogLocation("LoggerName").m1221build();
    }
}
