package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.ListenRELP;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenRELP.class */
public class TestListenRELP {
    public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
    public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder().txnr(1).command("open").dataLength("relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog".length()).data("relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog".getBytes(StandardCharsets.UTF_8)).build();
    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder().txnr(2).command("syslog").dataLength("this is a syslog message here".length()).data("this is a syslog message here".getBytes(StandardCharsets.UTF_8)).build();
    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder().txnr(3).command("close").dataLength(0).data(new byte[0]).build();
    private RELPEncoder encoder;
    private ResponseCapturingListenRELP proc;
    private TestRunner runner;

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenRELP$MockListenRELP.class */
    private static class MockListenRELP extends ListenRELP {
        private List<RELPEvent> mockEvents;

        public MockListenRELP(List<RELPEvent> list) {
            this.mockEvents = list;
        }

        @OnScheduled
        public void onScheduled(ProcessContext processContext) throws IOException {
            super.onScheduled(processContext);
            this.events.addAll(this.mockEvents);
        }

        protected ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<RELPEvent> blockingQueue) throws IOException {
            return (ChannelDispatcher) Mockito.mock(ChannelDispatcher.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenRELP$ResponseCapturingListenRELP.class */
    public static class ResponseCapturingListenRELP extends ListenRELP {
        private List<RELPResponse> responses;

        private ResponseCapturingListenRELP() {
            this.responses = new ArrayList();
        }

        protected void respond(RELPEvent rELPEvent, RELPResponse rELPResponse) {
            this.responses.add(rELPResponse);
            super.respond(rELPEvent, rELPResponse);
        }
    }

    @Before
    public void setup() {
        this.encoder = new RELPEncoder(StandardCharsets.UTF_8);
        this.proc = new ResponseCapturingListenRELP();
        this.runner = TestRunners.newTestRunner(this.proc);
        this.runner.setProperty(ListenRELP.PORT, "0");
    }

    @Test
    public void testListenRELP() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(OPEN_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(CLOSE_FRAME);
        run(arrayList, 3, 3, null);
        List provenanceEvents = this.runner.getProvenanceEvents();
        Assert.assertNotNull(provenanceEvents);
        Assert.assertEquals(3L, provenanceEvents.size());
        ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
        Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
        Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("relp"));
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
        Assert.assertEquals(3L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
        Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
    }

    @Test
    public void testBatching() throws IOException, InterruptedException {
        this.runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
        ArrayList arrayList = new ArrayList();
        arrayList.add(OPEN_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(CLOSE_FRAME);
        run(arrayList, 1, 3, null);
        List provenanceEvents = this.runner.getProvenanceEvents();
        Assert.assertNotNull(provenanceEvents);
        Assert.assertEquals(1L, provenanceEvents.size());
        ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) provenanceEvents.get(0);
        Assert.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
        Assert.assertTrue("transit uri must be set and start with proper protocol", provenanceEventRecord.getTransitUri().toLowerCase().startsWith("relp"));
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
    }

    @Test
    public void testTLS() throws InitializationException, IOException, InterruptedException {
        StandardSSLContextService standardSSLContextService = new StandardSSLContextService();
        this.runner.addControllerService("ssl-context", standardSSLContextService);
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.SSL_ALGORITHM, "TLSv1.2");
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/truststore.jks");
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "passwordpassword");
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/keystore.jks");
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "passwordpassword");
        this.runner.setProperty(standardSSLContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
        this.runner.enableControllerService(standardSSLContextService);
        this.runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context");
        ArrayList arrayList = new ArrayList();
        arrayList.add(OPEN_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(SYSLOG_FRAME);
        arrayList.add(CLOSE_FRAME);
        run(arrayList, 5, 5, standardSSLContextService);
    }

    @Test
    public void testNoEventsAvailable() throws IOException, InterruptedException {
        this.runner = TestRunners.newTestRunner(new MockListenRELP(new ArrayList()));
        this.runner.setProperty(ListenRELP.PORT, "1");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
    }

    @Test
    public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
        ChannelResponder channelResponder = (ChannelResponder) Mockito.mock(ChannelResponder.class);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RELPEvent("sender1", SYSLOG_FRAME.getData(), channelResponder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
        arrayList.add(new RELPEvent("sender1", SYSLOG_FRAME.getData(), channelResponder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
        arrayList.add(new RELPEvent("sender2", SYSLOG_FRAME.getData(), channelResponder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
        arrayList.add(new RELPEvent("sender2", SYSLOG_FRAME.getData(), channelResponder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
        this.runner = TestRunners.newTestRunner(new MockListenRELP(arrayList));
        this.runner.setProperty(ListenRELP.PORT, "1");
        this.runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
    }

    protected void run(List<RELPFrame> list, int i, int i2, SSLContextService sSLContextService) throws IOException, InterruptedException {
        Socket socket = null;
        try {
            ProcessSessionFactory processSessionFactory = this.runner.getProcessSessionFactory();
            ProcessContext processContext = this.runner.getProcessContext();
            this.proc.onScheduled(processContext);
            int dispatcherPort = this.proc.getDispatcherPort();
            socket = sSLContextService != null ? sSLContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED).getSocketFactory().createSocket("localhost", dispatcherPort) : new Socket("localhost", dispatcherPort);
            Thread.sleep(100L);
            sendFrames(list, socket);
            long currentTimeMillis = System.currentTimeMillis();
            while (this.proc.getQueueSize() < i2 && System.currentTimeMillis() - currentTimeMillis < 30000) {
                Thread.sleep(100L);
            }
            Assert.assertEquals(i2, this.proc.getQueueSize());
            long currentTimeMillis2 = System.currentTimeMillis();
            while (this.proc.responses.size() < i2 && System.currentTimeMillis() - currentTimeMillis2 < 30000) {
                this.proc.onTrigger(processContext, processSessionFactory);
                Thread.sleep(100L);
            }
            Assert.assertEquals(i2, this.proc.responses.size());
            this.runner.assertTransferCount(ListenRELP.REL_SUCCESS, i);
            this.proc.onUnscheduled();
            IOUtils.closeQuietly(socket);
        } catch (Throwable th) {
            this.proc.onUnscheduled();
            IOUtils.closeQuietly(socket);
            throw th;
        }
    }

    private void sendFrames(List<RELPFrame> list, Socket socket) throws IOException, InterruptedException {
        Iterator<RELPFrame> it = list.iterator();
        while (it.hasNext()) {
            socket.getOutputStream().write(this.encoder.encode(it.next()));
        }
        socket.getOutputStream().flush();
    }
}
