package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.TestListenSyslog;
import org.apache.nifi.processors.standard.util.TCPTestServer;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/standard/ITListenSyslog.class */
public class ITListenSyslog {
    static final Logger LOGGER = LoggerFactory.getLogger(ITListenSyslog.class);
    static final String PRI = "34";
    static final String SEV = "2";
    static final String FAC = "4";
    static final String TIME = "Oct 13 15:43:23";
    static final String HOST = "localhost.home";
    static final String BODY = "some message";
    static final String VALID_MESSAGE = "<34>Oct 13 15:43:23 localhost.home some message";
    static final String VALID_MESSAGE_TCP = "<34>Oct 13 15:43:23 localhost.home some message\n";
    static final String INVALID_MESSAGE = "this is not valid\n";

    /* loaded from: input_file:org/apache/nifi/processors/standard/ITListenSyslog$MultiConnectionSocketSender.class */
    public static final class MultiConnectionSocketSender implements Runnable {
        final int port;
        final int numMessages;
        final long delay;
        final String message;

        public MultiConnectionSocketSender(int i, int i2, long j, String str) {
            this.port = i;
            this.numMessages = i2;
            this.delay = j;
            this.message = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            byte[] bytes = this.message.getBytes(Charset.forName("UTF-8"));
            ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
            for (int i = 0; i < this.numMessages; i++) {
                try {
                    SocketChannel open = SocketChannel.open();
                    Throwable th = null;
                    try {
                        try {
                            open.connect(new InetSocketAddress("localhost", this.port));
                            allocate.clear();
                            allocate.put(bytes);
                            allocate.flip();
                            while (allocate.hasRemaining()) {
                                open.write(allocate);
                            }
                            Thread.sleep(this.delay);
                            if (open != null) {
                                if (0 != 0) {
                                    try {
                                        open.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    open.close();
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                            break;
                        }
                    } catch (Throwable th4) {
                        if (open != null) {
                            if (th != null) {
                                try {
                                    open.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                open.close();
                            }
                        }
                        throw th4;
                        break;
                    }
                } catch (IOException e) {
                    ITListenSyslog.LOGGER.error(e.getMessage(), e);
                } catch (InterruptedException e2) {
                    ITListenSyslog.LOGGER.error(e2.getMessage(), e2);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/ITListenSyslog$SingleConnectionSocketSender.class */
    public static final class SingleConnectionSocketSender implements Runnable {
        final int port;
        final int numMessages;
        final long delay;
        final String message;

        public SingleConnectionSocketSender(int i, int i2, long j, String str) {
            this.port = i;
            this.numMessages = i2;
            this.delay = j;
            this.message = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            byte[] bytes = this.message.getBytes(Charset.forName("UTF-8"));
            ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
            try {
                SocketChannel open = SocketChannel.open();
                Throwable th = null;
                try {
                    open.connect(new InetSocketAddress("localhost", this.port));
                    for (int i = 0; i < this.numMessages; i++) {
                        allocate.clear();
                        allocate.put(bytes);
                        allocate.flip();
                        while (allocate.hasRemaining()) {
                            open.write(allocate);
                        }
                        Thread.sleep(this.delay);
                    }
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            open.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                ITListenSyslog.LOGGER.error(e.getMessage(), e);
            } catch (InterruptedException e2) {
                ITListenSyslog.LOGGER.error(e2.getMessage(), e2);
            }
        }
    }

    @Test
    public void testUDP() throws IOException, InterruptedException {
        ListenSyslog listenSyslog = new ListenSyslog();
        TestRunner newTestRunner = TestRunners.newTestRunner(listenSyslog);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        listenSyslog.onScheduled(processContext);
        int port = listenSyslog.getPort();
        Assert.assertTrue(port > 0);
        Thread thread = new Thread(new TestListenSyslog.DatagramSender(port, 20, 10L, VALID_MESSAGE));
        thread.setDaemon(true);
        thread.start();
        try {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis() + 30000;
            while (i < 20 && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
                listenSyslog.onTrigger(processContext, processSessionFactory);
                i = newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
            }
            Assert.assertEquals("Did not process all the datagrams", 20L, i);
            checkFlowFile((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0), 0, ListenSyslog.UDP_VALUE.getValue());
            List provenanceEvents = newTestRunner.getProvenanceEvents();
            Assert.assertNotNull(provenanceEvents);
            Assert.assertEquals(20L, provenanceEvents.size());
            ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
            Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
            Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("udp"));
            listenSyslog.onUnscheduled();
        } catch (Throwable th) {
            listenSyslog.onUnscheduled();
            throw th;
        }
    }

    @Test
    public void testTCPSingleConnection() throws IOException, InterruptedException {
        ListenSyslog listenSyslog = new ListenSyslog();
        TestRunner newTestRunner = TestRunners.newTestRunner(listenSyslog);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        listenSyslog.onScheduled(processContext);
        Thread.sleep(500L);
        int port = listenSyslog.getPort();
        Assert.assertTrue(port > 0);
        Thread thread = new Thread(new SingleConnectionSocketSender(port, 20, 10L, VALID_MESSAGE_TCP));
        thread.setDaemon(true);
        thread.start();
        try {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis() + 30000;
            while (i < 20 && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
                listenSyslog.onTrigger(processContext, processSessionFactory);
                i = newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
            }
            Assert.assertEquals("Did not process all the messages", 20L, i);
            checkFlowFile((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0), 0, ListenSyslog.TCP_VALUE.getValue());
            List provenanceEvents = newTestRunner.getProvenanceEvents();
            Assert.assertNotNull(provenanceEvents);
            Assert.assertEquals(20L, provenanceEvents.size());
            ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
            Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
            Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("tcp"));
            listenSyslog.onUnscheduled();
        } catch (Throwable th) {
            listenSyslog.onUnscheduled();
            throw th;
        }
    }

    @Test
    public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException {
        ListenSyslog listenSyslog = new ListenSyslog();
        TestRunner newTestRunner = TestRunners.newTestRunner(listenSyslog);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        listenSyslog.onScheduled(processContext);
        int port = listenSyslog.getPort();
        Assert.assertTrue(port > 0);
        Thread thread = new Thread(new SingleConnectionSocketSender(port, 1, 10L, "<34>Oct 13 15:43:23 localhost.home some message\n\n<34>Oct 13 15:43:23 localhost.home some message\n\n<34>Oct 13 15:43:23 localhost.home some message\n\n"));
        thread.setDaemon(true);
        thread.start();
        try {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis() + 30000;
            while (i < 3 && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
                listenSyslog.onTrigger(processContext, processSessionFactory);
                i = newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
            }
            Assert.assertEquals("Did not process all the messages", 3L, i);
            checkFlowFile((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0), 0, ListenSyslog.TCP_VALUE.getValue());
            List provenanceEvents = newTestRunner.getProvenanceEvents();
            Assert.assertNotNull(provenanceEvents);
            Assert.assertEquals(3L, provenanceEvents.size());
            ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
            Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
            Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("tcp"));
            listenSyslog.onUnscheduled();
        } catch (Throwable th) {
            listenSyslog.onUnscheduled();
            throw th;
        }
    }

    @Test
    public void testTCPMultipleConnection() throws IOException, InterruptedException {
        ListenSyslog listenSyslog = new ListenSyslog();
        TestRunner newTestRunner = TestRunners.newTestRunner(listenSyslog);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        listenSyslog.onScheduled(processContext);
        int port = listenSyslog.getPort();
        Assert.assertTrue(port > 0);
        Thread thread = new Thread(new MultiConnectionSocketSender(port, 20, 10L, VALID_MESSAGE_TCP));
        thread.setDaemon(true);
        thread.start();
        try {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis() + 30000;
            while (i < 20 && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
                listenSyslog.onTrigger(processContext, processSessionFactory);
                i = newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
            }
            Assert.assertEquals("Did not process all the messages", 20L, i);
            checkFlowFile((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0), 0, ListenSyslog.TCP_VALUE.getValue());
            List provenanceEvents = newTestRunner.getProvenanceEvents();
            Assert.assertNotNull(provenanceEvents);
            Assert.assertEquals(20L, provenanceEvents.size());
            ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
            Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
            Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("tcp"));
            listenSyslog.onUnscheduled();
        } catch (Throwable th) {
            listenSyslog.onUnscheduled();
            throw th;
        }
    }

    @Test
    public void testInvalid() throws IOException, InterruptedException {
        ListenSyslog listenSyslog = new ListenSyslog();
        TestRunner newTestRunner = TestRunners.newTestRunner(listenSyslog);
        newTestRunner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
        newTestRunner.setProperty(ListenSyslog.PORT, "0");
        ProcessSessionFactory processSessionFactory = newTestRunner.getProcessSessionFactory();
        ProcessContext processContext = newTestRunner.getProcessContext();
        listenSyslog.onScheduled(processContext);
        int port = listenSyslog.getPort();
        Assert.assertTrue(port > 0);
        Thread thread = new Thread(new SingleConnectionSocketSender(port, 10, 100L, INVALID_MESSAGE));
        thread.setDaemon(true);
        thread.start();
        try {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis() + 30000;
            while (i < 10 && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(50L);
                listenSyslog.onTrigger(processContext, processSessionFactory);
                i = newTestRunner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
            }
            Assert.assertEquals("Did not process all the messages", 10L, i);
            listenSyslog.onUnscheduled();
        } catch (Throwable th) {
            listenSyslog.onUnscheduled();
            throw th;
        }
    }

    private void checkFlowFile(MockFlowFile mockFlowFile, int i, String str) {
        mockFlowFile.assertContentEquals(VALID_MESSAGE.replace(TCPTestServer.DEFAULT_MESSAGE_DELIMITER, ""));
        Assert.assertEquals(PRI, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_PRIORITY.key()));
        Assert.assertEquals(SEV, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_SEVERITY.key()));
        Assert.assertEquals(FAC, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_FACILITY.key()));
        Assert.assertEquals(TIME, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
        Assert.assertEquals(HOST, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_HOSTNAME.key()));
        Assert.assertEquals(BODY, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_BODY.key()));
        Assert.assertEquals("true", mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_VALID.key()));
        Assert.assertEquals(String.valueOf(i), mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_PORT.key()));
        Assert.assertEquals(str, mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_PROTOCOL.key()));
        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(SyslogAttributes.SYSLOG_SENDER.key())));
    }
}
