package org.apache.nifi.processors.standard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.ftpserver.ssl.ClientAuth;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.ListenRELP;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenRELP.class */
public class TestListenRELP {
    public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
    private static final String LOCALHOST = "localhost";

    @Mock
    private RestrictedSSLContextService sslContextService;
    private RELPEncoder encoder;
    private TestRunner runner;
    private static final Charset CHARSET = StandardCharsets.US_ASCII;
    private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(10);
    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder().txnr(1).command("open").dataLength("relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog".length()).data("relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog".getBytes(CHARSET)).build();
    public static final String RELP_FRAME_DATA = "this is a relp message here";
    static final RELPFrame RELP_FRAME = new RELPFrame.Builder().txnr(2).command("syslog").dataLength(RELP_FRAME_DATA.length()).data(RELP_FRAME_DATA.getBytes(CHARSET)).build();
    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder().txnr(3).command("close").dataLength(0).data(new byte[0]).build();

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenRELP$MockListenRELP.class */
    private static class MockListenRELP extends ListenRELP {
        private final List<RELPMessage> mockEvents;

        public MockListenRELP() {
            this.mockEvents = new ArrayList();
        }

        public MockListenRELP(List<RELPMessage> list) {
            this.mockEvents = list;
        }

        @OnScheduled
        public void onScheduled(ProcessContext processContext) throws IOException {
            super.onScheduled(processContext);
            this.events.addAll(this.mockEvents);
        }
    }

    @BeforeEach
    public void setup() {
        this.encoder = new RELPEncoder(CHARSET);
        this.runner = TestRunners.newTestRunner(new MockListenRELP());
    }

    @AfterEach
    public void shutdown() {
        this.runner.shutdown();
    }

    @Test
    public void testRELPFramesAreReceivedSuccessfully() throws Exception {
        run(getFrames(5), 5, null);
        List provenanceEvents = this.runner.getProvenanceEvents();
        Assertions.assertNotNull(provenanceEvents);
        Assertions.assertEquals(5, provenanceEvents.size());
        ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
        Assertions.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
        Assertions.assertTrue(provenanceEventRecord.getTransitUri().toLowerCase().startsWith("relp"), "transit uri must be set and start with proper protocol");
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
        Assertions.assertEquals(5, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assertions.assertEquals(String.valueOf(RELP_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
        Assertions.assertEquals(RELP_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
        Assertions.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
        Assertions.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
    }

    @Test
    public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws Exception {
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
        run(getFrames(3), 1, null);
        List provenanceEvents = this.runner.getProvenanceEvents();
        Assertions.assertNotNull(provenanceEvents);
        Assertions.assertEquals(1, provenanceEvents.size());
        ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
        Assertions.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
        Assertions.assertTrue(provenanceEventRecord.getTransitUri().toLowerCase().startsWith("relp"), "transit uri must be set and start with proper protocol");
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
        Assertions.assertEquals(1, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assertions.assertEquals(RELP_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
        Assertions.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
        Assertions.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
    }

    @Test
    public void testRunMutualTls() throws Exception {
        String name = SSLContextService.class.getName();
        Mockito.when(this.sslContextService.getIdentifier()).thenReturn(name);
        SSLContext createKeyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
        Mockito.when(this.sslContextService.createContext()).thenReturn(createKeyStoreSslContext);
        this.runner.addControllerService(name, this.sslContextService);
        this.runner.enableControllerService(this.sslContextService);
        this.runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, name);
        this.runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
        run(getFrames(3), 3, createKeyStoreSslContext);
    }

    @Test
    public void testBatchingWithDifferentSenders() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RELPMessage("/192.168.1.50:55000", RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        arrayList.add(new RELPMessage("/192.168.1.50:55000", RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        arrayList.add(new RELPMessage("/192.168.1.50:55000", RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        arrayList.add(new RELPMessage("/192.168.1.50:55001", RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        arrayList.add(new RELPMessage("/192.168.1.50:55002", RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        arrayList.add(new RELPMessage("/192.168.1.50:55002", RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        this.runner = TestRunners.newTestRunner(new MockListenRELP(arrayList));
        this.runner.setProperty(ListenerProperties.PORT, Integer.toString(NetworkUtils.availablePort()));
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
        this.runner.shutdown();
    }

    private void run(List<RELPFrame> list, int i, SSLContext sSLContext) throws Exception {
        int availablePort = NetworkUtils.availablePort();
        this.runner.setProperty(ListenerProperties.PORT, Integer.toString(availablePort));
        this.runner.run(1, false, true);
        sendMessages(availablePort, getRELPMessages(list), sSLContext);
        this.runner.run(i, false, false);
        this.runner.assertTransferCount(ListenRELP.REL_SUCCESS, i);
    }

    private byte[] getRELPMessages(List<RELPFrame> list) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Iterator<RELPFrame> it = list.iterator();
        while (it.hasNext()) {
            byteArrayOutputStream.write(this.encoder.encode(it.next()));
            byteArrayOutputStream.flush();
        }
        return byteArrayOutputStream.toByteArray();
    }

    private List<RELPFrame> getFrames(int i) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(OPEN_FRAME);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(RELP_FRAME);
        }
        arrayList.add(CLOSE_FRAME);
        return arrayList;
    }

    private void sendMessages(int i, byte[] bArr, SSLContext sSLContext) throws Exception {
        ByteArrayNettyEventSenderFactory byteArrayNettyEventSenderFactory = new ByteArrayNettyEventSenderFactory(this.runner.getLogger(), LOCALHOST, i, TransportProtocol.TCP);
        byteArrayNettyEventSenderFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        byteArrayNettyEventSenderFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        if (sSLContext != null) {
            byteArrayNettyEventSenderFactory.setSslContext(sSLContext);
        }
        byteArrayNettyEventSenderFactory.setTimeout(SENDER_TIMEOUT);
        EventSender eventSender = byteArrayNettyEventSenderFactory.getEventSender();
        Throwable th = null;
        try {
            try {
                eventSender.sendEvent(bArr);
                if (eventSender != null) {
                    if (0 == 0) {
                        eventSender.close();
                        return;
                    }
                    try {
                        eventSender.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (eventSender != null) {
                if (th != null) {
                    try {
                        eventSender.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    eventSender.close();
                }
            }
            throw th4;
        }
    }
}
