package org.apache.logging.log4j.flume.appender;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.AvroSource;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.test.AvailablePortFinder;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.plugins.di.DI;
import org.apache.logging.log4j.plugins.di.Injector;
import org.apache.logging.log4j.status.StatusLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/logging/log4j/flume/appender/FlumeAppenderTest.class */
public class FlumeAppenderTest {
    private AvroSource eventSource;
    private Channel channel;
    private Logger avroLogger;
    private String testPort;
    private Injector injector;

    @BeforeClass
    public static void setupClass() {
        StatusLogger.getLogger().setLevel(Level.OFF);
    }

    @Before
    public void setUp() throws Exception {
        this.injector = DI.createInjector();
        this.injector.init();
        this.eventSource = new AvroSource();
        this.channel = new MemoryChannel();
        Configurables.configure(this.channel, new Context());
        this.avroLogger = LogManager.getLogger("avrologger");
        removeAppenders(this.avroLogger);
        Context context = new Context();
        this.testPort = String.valueOf(AvailablePortFinder.getNextAvailable());
        context.put("port", this.testPort);
        context.put("bind", "0.0.0.0");
        Configurables.configure(this.eventSource, context);
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.channel);
        ReplicatingChannelSelector replicatingChannelSelector = new ReplicatingChannelSelector();
        replicatingChannelSelector.setChannels(arrayList);
        this.eventSource.setChannelProcessor(new ChannelProcessor(replicatingChannelSelector));
        this.eventSource.start();
        Assert.assertTrue("Reached start or error", LifecycleController.waitForOneOf(this.eventSource, LifecycleState.START_OR_ERROR));
        Assert.assertEquals("Server is started", LifecycleState.START, this.eventSource.getLifecycleState());
    }

    @After
    public void teardown() throws Exception {
        removeAppenders(this.avroLogger);
        this.eventSource.stop();
        Assert.assertTrue("Reached stop or error", LifecycleController.waitForOneOf(this.eventSource, LifecycleState.STOP_OR_ERROR));
        Assert.assertEquals("Server is stopped", LifecycleState.STOP, this.eventSource.getLifecycleState());
    }

    @Test
    public void testLog4jAvroAppender() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull(this.avroLogger);
        this.avroLogger.info("Test message");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event take = this.channel.take();
        Assert.assertNotNull(take);
        Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Test message"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testLog4jAvroAppenderWithHostsParam() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender((Agent[]) null, (Property[]) null, String.format("localhost:%s", this.testPort), "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull(this.avroLogger);
        this.avroLogger.info("Test message");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event take = this.channel.take();
        Assert.assertNotNull(take);
        Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Test message"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testStructured() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, "ReqCtx_", (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        Logger logger = LogManager.getLogger("EventLogger");
        Assert.assertNotNull(logger);
        logger.addAppender(createAppender);
        logger.setLevel(Level.ALL);
        StructuredDataMessage structuredDataMessage = new StructuredDataMessage("Transfer", "Success", "Audit");
        structuredDataMessage.put("memo", "This is a memo");
        structuredDataMessage.put("acct", "12345");
        structuredDataMessage.put("amount", "100.00");
        ThreadContext.put("id", UUID.randomUUID().toString());
        ThreadContext.put("memo", (String) null);
        ThreadContext.put("test", "123");
        EventLogger.logEvent(structuredDataMessage);
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event take = this.channel.take();
        Assert.assertNotNull(take);
        Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Success"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
        logger.removeAppender(createAppender);
        createAppender.stop();
    }

    @Test
    public void testMultiple() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull(this.avroLogger);
        for (int i = 0; i < 10; i++) {
            this.avroLogger.info("Test message " + i);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Transaction transaction = this.channel.getTransaction();
            transaction.begin();
            Event take = this.channel.take();
            Assert.assertNotNull(take);
            Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Test message " + i2));
            transaction.commit();
            transaction.close();
        }
        this.eventSource.stop();
    }

    @Test
    public void testIncompleteBatch() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "500", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "10", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull(this.avroLogger);
        this.avroLogger.info("Test message 0");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Assert.assertNull("Received event", this.channel.take());
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
        }
        this.avroLogger.info("Test message 1");
        for (int i = 0; i < 2; i++) {
            Event take = this.channel.take();
            Assert.assertNotNull("No event for item " + i, take);
            Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Test message " + i));
        }
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testIncompleteBatch2() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "500", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "10", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull(this.avroLogger);
        this.avroLogger.info("Test message 0");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        this.avroLogger.info("Test message 1");
        this.avroLogger.info("Test message 2");
        createAppender.stop();
        for (int i = 0; i < 3; i++) {
            Event take = this.channel.take();
            Assert.assertNotNull("No event for item " + i, take);
            Assert.assertTrue("Channel contained event, but not expected message. Received : " + getBody(take), getBody(take).endsWith("Test message " + i));
        }
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testBatch() throws IOException {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "10", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull(this.avroLogger);
        for (int i = 0; i < 10; i++) {
            this.avroLogger.info("Test message " + i);
        }
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        for (int i2 = 0; i2 < 10; i2++) {
            Event take = this.channel.take();
            Assert.assertNotNull("No event for item " + i2, take);
            Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Test message " + i2));
        }
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testConnectionRefused() {
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        this.eventSource.stop();
        boolean z = false;
        try {
            this.avroLogger.info("message 1");
        } catch (Throwable th) {
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testNotConnected() throws Exception {
        this.eventSource.stop();
        String num = Integer.toString(Integer.parseInt(this.testPort) + 1);
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort), Agent.createAgent("localhost", num)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        Assert.assertTrue("Appender Not started", createAppender.isStarted());
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        try {
            this.avroLogger.info("Test message");
            Assert.fail("Exception should have been thrown");
        } catch (Exception e) {
        }
        try {
            Context context = new Context();
            context.put("port", num);
            context.put("bind", "0.0.0.0");
            Configurables.configure(this.eventSource, context);
            this.eventSource.start();
        } catch (ChannelException e2) {
            Assert.fail("Caught exception while resetting port to " + num + " : " + e2.getMessage());
        }
        this.avroLogger.info("Test message 2");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event take = this.channel.take();
        Assert.assertNotNull(take);
        Assert.assertTrue("Channel contained event, but not expected message", getBody(take).endsWith("Test message 2"));
        transaction.commit();
        transaction.close();
    }

    @Test
    public void testReconnect() throws Exception {
        String num = Integer.toString(Integer.parseInt(this.testPort) + 1);
        FlumeAppender createAppender = FlumeAppender.createAppender(new Agent[]{Agent.createAgent("localhost", this.testPort), Agent.createAgent("localhost", num)}, (Property[]) null, (String) null, "false", "Avro", (String) null, "1000", "1000", "1", "1000", "avro", "false", (String) null, (String) null, (String) null, (String) null, (String) null, "true", "1", (String) null, (FlumeEventFactory) null, (Layout) null, (Filter) null, this.injector);
        createAppender.start();
        this.avroLogger.addAppender(createAppender);
        this.avroLogger.setLevel(Level.ALL);
        this.avroLogger.info("Test message");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event take = this.channel.take();
        Assert.assertNotNull(take);
        Assert.assertTrue("Channel contained event, but not expected message. Received : " + getBody(take), getBody(take).endsWith("Test message"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
        try {
            Context context = new Context();
            context.put("port", num);
            context.put("bind", "0.0.0.0");
            Configurables.configure(this.eventSource, context);
            this.eventSource.start();
        } catch (ChannelException e) {
            Assert.fail("Caught exception while resetting port to " + num + " : " + e.getMessage());
        }
        this.avroLogger.info("Test message 2");
        Transaction transaction2 = this.channel.getTransaction();
        transaction2.begin();
        Event take2 = this.channel.take();
        Assert.assertNotNull(take2);
        Assert.assertTrue("Channel contained event, but not expected message", getBody(take2).endsWith("Test message 2"));
        transaction2.commit();
        transaction2.close();
    }

    private void removeAppenders(Logger logger) {
        Iterator it = logger.getAppenders().entrySet().iterator();
        while (it.hasNext()) {
            Appender appender = (Appender) ((Map.Entry) it.next()).getValue();
            this.avroLogger.removeAppender(appender);
            appender.stop();
        }
    }

    private String getBody(Event event) throws IOException {
        if (event == null) {
            return "";
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPInputStream gZIPInputStream = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
        while (true) {
            int read = gZIPInputStream.read();
            if (-1 == read) {
                return new String(byteArrayOutputStream.toByteArray());
            }
            byteArrayOutputStream.write(read);
        }
    }
}
