package org.apache.nifi.processors.standard;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestPutSyslog.class */
public class TestPutSyslog {
    private static final String ADDRESS = "127.0.0.1";
    private static final String DEFAULT_PROTOCOL = "UDP";
    private static final int MAX_FRAME_LENGTH = 1024;
    private static final String DELIMITER = "\n";
    private static final int POLL_TIMEOUT_SECONDS = 5;
    private TestRunner runner;
    private final TransportProtocol protocol = TransportProtocol.UDP;
    private InetAddress address;
    private int port;
    private static final String MESSAGE_BODY = String.class.getName();
    private static final String MESSAGE_PRIORITY = "1";
    private static final String TIMESTAMP = "Jan 1 00:00:00";
    private static final String LOCALHOST = "localhost";
    private static final String SYSLOG_MESSAGE = String.format("<%s>%s %s %s", MESSAGE_PRIORITY, TIMESTAMP, LOCALHOST, MESSAGE_BODY);
    private static final String VERSION = "2";
    private static final String VERSION_SYSLOG_MESSAGE = String.format("<%s>%s %s %s %s", MESSAGE_PRIORITY, VERSION, TIMESTAMP, LOCALHOST, MESSAGE_BODY);
    private static final Charset CHARSET = StandardCharsets.UTF_8;

    @Before
    public void setRunner() throws UnknownHostException {
        this.address = InetAddress.getByName(ADDRESS);
        this.port = NetworkUtils.getAvailableUdpPort();
        this.runner = TestRunners.newTestRunner(PutSyslog.class);
        this.runner.setProperty(PutSyslog.HOSTNAME, ADDRESS);
        this.runner.setProperty(PutSyslog.PROTOCOL, this.protocol.toString());
        this.runner.setProperty(PutSyslog.PORT, Integer.toString(this.port));
        this.runner.setProperty(PutSyslog.MSG_BODY, MESSAGE_BODY);
        this.runner.setProperty(PutSyslog.MSG_PRIORITY, MESSAGE_PRIORITY);
        this.runner.setProperty(PutSyslog.MSG_HOSTNAME, LOCALHOST);
        this.runner.setProperty(PutSyslog.MSG_TIMESTAMP, TIMESTAMP);
        this.runner.assertValid();
    }

    @Test
    public void testRunNoFlowFiles() {
        this.runner.run();
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testRunSuccess() throws InterruptedException {
        assertSyslogMessageSuccess(SYSLOG_MESSAGE, Collections.emptyMap());
    }

    @Test
    public void testRunSuccessSyslogVersion() throws InterruptedException {
        this.runner.setProperty(PutSyslog.MSG_VERSION, String.format("${%s}", "version"));
        assertSyslogMessageSuccess(VERSION_SYSLOG_MESSAGE, Collections.singletonMap("version", VERSION));
    }

    @Test
    public void testRunInvalid() {
        this.runner.setProperty(PutSyslog.MSG_PRIORITY, Integer.toString(Integer.MAX_VALUE));
        this.runner.enqueue(new byte[0]);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID);
    }

    @Test
    public void testRunFailure() {
        this.runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
        this.runner.setProperty(PutSyslog.PORT, Integer.toString(NetworkUtils.getAvailableTcpPort()));
        this.runner.enqueue(new byte[0]);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE);
    }

    private void assertSyslogMessageSuccess(String str, Map<String, String> map) throws InterruptedException {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ByteArrayMessageNettyEventServerFactory byteArrayMessageNettyEventServerFactory = new ByteArrayMessageNettyEventServerFactory(this.runner.getLogger(), this.address, this.port, this.protocol, DELIMITER.getBytes(CHARSET), MAX_FRAME_LENGTH, linkedBlockingQueue);
        byteArrayMessageNettyEventServerFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        byteArrayMessageNettyEventServerFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        EventServer eventServer = byteArrayMessageNettyEventServerFactory.getEventServer();
        try {
            this.runner.enqueue(str, map);
            this.runner.run();
            String str2 = new String(((ByteArrayMessage) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS)).getMessage(), CHARSET);
            this.runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS);
            Assert.assertEquals(str, str2);
            assertProvenanceRecordTransitUriFound();
            eventServer.shutdown();
        } catch (Throwable th) {
            eventServer.shutdown();
            throw th;
        }
    }

    private void assertProvenanceRecordTransitUriFound() {
        List provenanceEvents = this.runner.getProvenanceEvents();
        Assert.assertFalse("Provenance Events not found", provenanceEvents.isEmpty());
        ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.iterator().next();
        Assert.assertEquals(ProvenanceEventType.SEND, provenanceEventRecord.getEventType());
        String transitUri = provenanceEventRecord.getTransitUri();
        Assert.assertNotNull("Transit URI not found", transitUri);
        Assert.assertTrue("Transit URI Protocol not found", transitUri.contains(DEFAULT_PROTOCOL));
        Assert.assertTrue("Transit URI Hostname not found", transitUri.contains(ADDRESS));
        Assert.assertTrue("Transit URI Port not found", transitUri.contains(Integer.toString(this.port)));
    }
}
