package org.apache.nifi.processors.standard;

import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenTCPRecord.class */
public class TestListenTCPRecord {
    static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
    static final String SCHEMA_TEXT = "{\n  \"name\": \"syslogRecord\",\n  \"namespace\": \"nifi\",\n  \"type\": \"record\",\n  \"fields\": [\n    { \"name\": \"timestamp\", \"type\": \"string\" },\n    { \"name\": \"logsource\", \"type\": \"string\" },\n    { \"name\": \"message\", \"type\": \"string\" }\n  ]\n}";
    static final List<String> DATA;
    private ListenTCPRecord proc;
    private TestRunner runner;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenTCPRecord$SocketSender.class */
    public static class SocketSender implements Runnable, Closeable {
        private final int port;
        private final String host;
        private final SSLContext sslContext;
        private final List<String> data;
        private final long delay;
        private Socket socket;

        public SocketSender(int i, String str, SSLContext sSLContext, List<String> list, long j) {
            this.port = i;
            this.host = str;
            this.sslContext = sSLContext;
            this.data = list;
            this.delay = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.sslContext != null) {
                    this.socket = this.sslContext.getSocketFactory().createSocket(this.host, this.port);
                } else {
                    this.socket = new Socket(this.host, this.port);
                }
                Iterator<String> it = this.data.iterator();
                while (it.hasNext()) {
                    this.socket.getOutputStream().write(it.next().getBytes(StandardCharsets.UTF_8));
                    if (this.delay > 0) {
                        Thread.sleep(this.delay);
                    }
                }
                this.socket.getOutputStream().flush();
            } catch (Exception e) {
                TestListenTCPRecord.LOGGER.error(e.getMessage(), e);
            } finally {
                IOUtils.closeQuietly(this.socket);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            IOUtils.closeQuietly(this.socket);
        }
    }

    @Before
    public void setup() throws InitializationException {
        this.proc = new ListenTCPRecord();
        this.runner = TestRunners.newTestRunner(this.proc);
        this.runner.setProperty(ListenTCPRecord.PORT, "0");
        JsonTreeReader jsonTreeReader = new JsonTreeReader();
        this.runner.addControllerService("record-reader", jsonTreeReader);
        this.runner.setProperty(jsonTreeReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
        this.runner.setProperty(jsonTreeReader, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
        this.runner.enableControllerService(jsonTreeReader);
        MockRecordWriter mockRecordWriter = new MockRecordWriter("timestamp, logsource, message");
        this.runner.addControllerService("record-writer", mockRecordWriter);
        this.runner.enableControllerService(mockRecordWriter);
        this.runner.setProperty(ListenTCPRecord.RECORD_READER, "record-reader");
        this.runner.setProperty(ListenTCPRecord.RECORD_WRITER, "record-writer");
    }

    @Test
    public void testCustomValidate() throws InitializationException {
        this.runner.setProperty(ListenTCPRecord.PORT, "1");
        this.runner.assertValid();
        configureProcessorSslContextService();
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, "");
        this.runner.assertNotValid();
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, SslContextFactory.ClientAuth.REQUIRED.name());
        this.runner.assertValid();
    }

    @Test
    public void testOneRecordPerFlowFile() throws IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
        runTCP(DATA, 3, null);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        for (int i = 0; i < flowFilesForRelationship.size(); i++) {
            MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(i);
            mockFlowFile.assertAttributeEquals("record.count", "1");
            String str = new String(mockFlowFile.toByteArray(), StandardCharsets.UTF_8);
            Assert.assertNotNull(str);
            Assert.assertTrue(str.contains("This is a test " + (i + 1)));
        }
    }

    @Test
    public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
        runTCP(DATA, 1, null);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        mockFlowFile.assertAttributeEquals("record.count", "3");
        String str = new String(mockFlowFile.toByteArray(), StandardCharsets.UTF_8);
        Assert.assertNotNull(str);
        Assert.assertTrue(str.contains("This is a test 1"));
        Assert.assertTrue(str.contains("This is a test 2"));
        Assert.assertTrue(str.contains("This is a test 3"));
    }

    @Test
    public void testTLSClientAuthRequiredAndClientCertProvided() throws InitializationException, IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name());
        configureProcessorSslContextService();
        runTCP(DATA, 1, SslContextFactory.createSslContext("src/test/resources/keystore.jks", "passwordpassword".toCharArray(), "jks", "src/test/resources/truststore.jks", "passwordpassword".toCharArray(), "jks", SslContextFactory.ClientAuth.valueOf("NONE"), "TLSv1.2"));
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        String str = new String(((MockFlowFile) flowFilesForRelationship.get(0)).toByteArray(), StandardCharsets.UTF_8);
        Assert.assertNotNull(str);
        Assert.assertTrue(str.contains("This is a test 1"));
        Assert.assertTrue(str.contains("This is a test 2"));
        Assert.assertTrue(str.contains("This is a test 3"));
    }

    @Test
    public void testTLSClientAuthRequiredAndClientCertNotProvided() throws InitializationException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name());
        this.runner.setProperty(ListenTCPRecord.READ_TIMEOUT, "5 seconds");
        configureProcessorSslContextService();
        runTCP(DATA, 0, SslContextFactory.createTrustSslContext("src/test/resources/truststore.jks", "passwordpassword".toCharArray(), "jks", "TLS"));
    }

    @Test
    public void testTLSClientAuthNoneAndClientCertNotProvided() throws InitializationException, CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name());
        configureProcessorSslContextService();
        runTCP(DATA, 1, SslContextFactory.createTrustSslContext("src/test/resources/truststore.jks", "passwordpassword".toCharArray(), "jks", "TLSv1.2"));
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        String str = new String(((MockFlowFile) flowFilesForRelationship.get(0)).toByteArray(), StandardCharsets.UTF_8);
        Assert.assertNotNull(str);
        Assert.assertTrue(str.contains("This is a test 1"));
        Assert.assertTrue(str.contains("This is a test 2"));
        Assert.assertTrue(str.contains("This is a test 3"));
    }

    protected void runTCP(List<String> list, int i, SSLContext sSLContext) throws IOException, InterruptedException {
        SocketSender socketSender = null;
        try {
            ProcessSessionFactory processSessionFactory = this.runner.getProcessSessionFactory();
            ProcessContext processContext = this.runner.getProcessContext();
            this.proc.onScheduled(processContext);
            Thread.sleep(100L);
            socketSender = new SocketSender(this.proc.getDispatcherPort(), "localhost", sSLContext, list, 0L);
            Thread thread = new Thread(socketSender);
            thread.setDaemon(true);
            thread.start();
            int i2 = 0;
            long currentTimeMillis = System.currentTimeMillis();
            while (i2 < i && System.currentTimeMillis() - currentTimeMillis < 10000) {
                this.proc.onTrigger(processContext, processSessionFactory);
                i2 = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
                Thread.sleep(100L);
            }
            this.runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, i);
            this.proc.onStopped();
            IOUtils.closeQuietly(socketSender);
        } catch (Throwable th) {
            this.proc.onStopped();
            IOUtils.closeQuietly(socketSender);
            throw th;
        }
    }

    private SSLContextService configureProcessorSslContextService() throws InitializationException {
        StandardRestrictedSSLContextService standardRestrictedSSLContextService = new StandardRestrictedSSLContextService();
        this.runner.addControllerService("ssl-context", standardRestrictedSSLContextService);
        this.runner.setProperty(standardRestrictedSSLContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/truststore.jks");
        this.runner.setProperty(standardRestrictedSSLContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "passwordpassword");
        this.runner.setProperty(standardRestrictedSSLContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
        this.runner.setProperty(standardRestrictedSSLContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/keystore.jks");
        this.runner.setProperty(standardRestrictedSSLContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "passwordpassword");
        this.runner.setProperty(standardRestrictedSSLContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
        this.runner.enableControllerService(standardRestrictedSSLContextService);
        this.runner.setProperty(ListenTCPRecord.SSL_CONTEXT_SERVICE, "ssl-context");
        return standardRestrictedSSLContextService;
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add("[");
        arrayList.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},");
        arrayList.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},");
        arrayList.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}");
        arrayList.add("]");
        DATA = Collections.unmodifiableList(arrayList);
    }
}
