package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.ListenSyslog;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.SyslogEvent;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenSyslog.class */
public class TestListenSyslog {
    static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class);
    static final String PRI = "34";
    static final String SEV = "2";
    static final String FAC = "4";
    static final String TIME = "Oct 13 15:43:23";
    static final String HOST = "localhost.home";
    static final String BODY = "some message";
    static final String VALID_MESSAGE = "<34>Oct 13 15:43:23 localhost.home some message";
    static final String VALID_MESSAGE_TCP = "<34>Oct 13 15:43:23 localhost.home some message\n";
    static final String INVALID_MESSAGE = "this is not valid\n";

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenSyslog$CannedMessageProcessor.class */
    private static class CannedMessageProcessor extends ListenSyslog {
        private final Iterator<ListenSyslog.RawSyslogEvent> eventItr;

        public CannedMessageProcessor(List<ListenSyslog.RawSyslogEvent> list) {
            this.eventItr = list.iterator();
        }

        public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
            ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
            arrayList.remove(PORT);
            arrayList.add(new PropertyDescriptor.Builder().name(PORT.getName()).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(Validator.VALID).build());
            return arrayList;
        }

        protected ListenSyslog.RawSyslogEvent getMessage(boolean z, boolean z2, ProcessSession processSession) {
            return this.eventItr.hasNext() ? this.eventItr.next() : super.getMessage(z, z2, processSession);
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenSyslog$DatagramSender.class */
    public static final class DatagramSender implements Runnable {
        final int port;
        final int numMessages;
        final long delay;
        final String message;

        public DatagramSender(int i, int i2, long j, String str) {
            this.port = i;
            this.numMessages = i2;
            this.delay = j;
            this.message = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            byte[] bytes = this.message.getBytes(Charset.forName("UTF-8"));
            ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
            try {
                DatagramChannel open = DatagramChannel.open();
                Throwable th = null;
                try {
                    open.connect(new InetSocketAddress("localhost", this.port));
                    for (int i = 0; i < this.numMessages; i++) {
                        allocate.clear();
                        allocate.put(bytes);
                        allocate.flip();
                        while (allocate.hasRemaining()) {
                            open.write(allocate);
                        }
                        Thread.sleep(this.delay);
                    }
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            open.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                TestListenSyslog.LOGGER.error(e.getMessage(), e);
            } catch (InterruptedException e2) {
                TestListenSyslog.LOGGER.error(e2.getMessage(), e2);
            }
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenSyslog$FailParseProcessor.class */
    private static class FailParseProcessor extends ListenSyslog {
        private FailParseProcessor() {
        }

        protected SyslogParser getParser() {
            return new SyslogParser(StandardCharsets.UTF_8) { // from class: org.apache.nifi.processors.standard.TestListenSyslog.FailParseProcessor.1
                public SyslogEvent parseEvent(byte[] bArr, String str) {
                    throw new ProcessException("Unit test intentionally failing");
                }
            };
        }
    }

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
    }

    @Test
    public void testBatching() throws IOException, InterruptedException {
        ListenSyslog listenSyslog = new ListenSyslog();
        TestRunner newTestRunner = TestRunners.newTestRunner(listenSyslog);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        newTestRunner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "25");
        newTestRunner.setProperty(ListenSyslog.MESSAGE_DELIMITER, "|");
        newTestRunner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        listenSyslog.onScheduled(processContext);
        int port = listenSyslog.getPort();
        Assert.assertTrue(port > 0);
        Thread thread = new Thread(new DatagramSender(port, 5, 10L, VALID_MESSAGE));
        thread.setDaemon(true);
        thread.start();
        thread.join();
        try {
            listenSyslog.onTrigger(processContext, processSessionFactory);
            newTestRunner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
            MockFlowFile mockFlowFile = (MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
            Assert.assertEquals("0", mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key()));
            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key()));
            Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key())));
            Assert.assertEquals(5L, new String(mockFlowFile.toByteArray(), StandardCharsets.UTF_8).split("\\|").length);
            List provenanceEvents = newTestRunner.getProvenanceEvents();
            Assert.assertNotNull(provenanceEvents);
            Assert.assertEquals(1L, provenanceEvents.size());
            ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
            Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
            Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("udp"));
            listenSyslog.onUnscheduled();
        } catch (Throwable th) {
            listenSyslog.onUnscheduled();
            throw th;
        }
    }

    @Test
    public void testParsingError() throws IOException {
        FailParseProcessor failParseProcessor = new FailParseProcessor();
        TestRunner newTestRunner = TestRunners.newTestRunner(failParseProcessor);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        failParseProcessor.onScheduled(processContext);
        try {
            new DatagramSender(failParseProcessor.getPort(), 1, 1L, INVALID_MESSAGE).run();
            failParseProcessor.onTrigger(processContext, processSessionFactory);
            newTestRunner.assertTransferCount(ListenSyslog.REL_INVALID, 1);
            newTestRunner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
            failParseProcessor.onUnscheduled();
        } catch (Throwable th) {
            failParseProcessor.onUnscheduled();
            throw th;
        }
    }

    @Test
    public void testErrorQueue() throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
        arrayList.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        arrayList.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") { // from class: org.apache.nifi.processors.standard.TestListenSyslog.1
            public byte[] getData() {
                if (atomicInteger.incrementAndGet() == 1) {
                    throw new FlowFileAccessException("Unit test failure");
                }
                return TestListenSyslog.VALID_MESSAGE.getBytes();
            }
        });
        TestRunner newTestRunner = TestRunners.newTestRunner(new CannedMessageProcessor(arrayList));
        newTestRunner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "5");
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        newTestRunner.setProperty(ListenSyslog.PARSE_MESSAGES, "false");
        newTestRunner.run();
        Assert.assertEquals(1L, r0.getErrorQueueSize());
        newTestRunner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
        ((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0)).assertContentEquals("<34>Oct 13 15:43:23 localhost.home some message\n<34>Oct 13 15:43:23 localhost.home some message");
        newTestRunner.clearTransferState();
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
        ((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0)).assertContentEquals(VALID_MESSAGE);
    }
}
