package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenTCPRecord.class */
public class TestListenTCPRecord {
    static final String SCHEMA_TEXT = "{\n  \"name\": \"syslogRecord\",\n  \"namespace\": \"nifi\",\n  \"type\": \"record\",\n  \"fields\": [\n    { \"name\": \"timestamp\", \"type\": \"string\" },\n    { \"name\": \"logsource\", \"type\": \"string\" },\n    { \"name\": \"message\", \"type\": \"string\" }\n  ]\n}";
    static final String DATA = "[{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}]";
    private static final long TEST_TIMEOUT = 30000;
    private static final String LOCALHOST = "localhost";
    private static SSLContext keyStoreSslContext;
    private static SSLContext trustStoreSslContext;
    private TestRunner runner;
    private static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
    private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();

    @BeforeClass
    public static void configureServices() throws TlsException {
        keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
        trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
    }

    @Before
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner(ListenTCPRecord.class);
        JsonTreeReader jsonTreeReader = new JsonTreeReader();
        this.runner.addControllerService("record-reader", jsonTreeReader);
        this.runner.setProperty(jsonTreeReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
        this.runner.setProperty(jsonTreeReader, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
        this.runner.enableControllerService(jsonTreeReader);
        MockRecordWriter mockRecordWriter = new MockRecordWriter("timestamp, logsource, message");
        this.runner.addControllerService("record-writer", mockRecordWriter);
        this.runner.enableControllerService(mockRecordWriter);
        this.runner.setProperty(ListenTCPRecord.RECORD_READER, "record-reader");
        this.runner.setProperty(ListenTCPRecord.RECORD_WRITER, "record-writer");
    }

    @Test
    public void testCustomValidate() throws InitializationException {
        this.runner.setProperty(ListenTCPRecord.PORT, "1");
        this.runner.assertValid();
        enableSslContextService(keyStoreSslContext);
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, "");
        this.runner.assertNotValid();
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
        this.runner.assertValid();
    }

    @Test(timeout = TEST_TIMEOUT)
    public void testRunOneRecordPerFlowFile() throws IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
        run(3, null);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        for (int i = 0; i < flowFilesForRelationship.size(); i++) {
            MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(i);
            mockFlowFile.assertAttributeEquals("record.count", "1");
            String str = new String(mockFlowFile.toByteArray(), StandardCharsets.UTF_8);
            Assert.assertNotNull(str);
            Assert.assertTrue(str.contains("This is a test " + (i + 1)));
        }
    }

    @Test(timeout = TEST_TIMEOUT)
    public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
        run(1, null);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        mockFlowFile.assertAttributeEquals("record.count", "3");
        String str = new String(mockFlowFile.toByteArray(), StandardCharsets.UTF_8);
        Assert.assertNotNull(str);
        Assert.assertTrue(str.contains("This is a test 1"));
        Assert.assertTrue(str.contains("This is a test 2"));
        Assert.assertTrue(str.contains("This is a test 3"));
    }

    @Test(timeout = TEST_TIMEOUT)
    public void testRunClientAuthRequired() throws InitializationException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
        enableSslContextService(keyStoreSslContext);
        run(1, keyStoreSslContext);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        String str = new String(((MockFlowFile) flowFilesForRelationship.get(0)).toByteArray(), StandardCharsets.UTF_8);
        Assert.assertNotNull(str);
        Assert.assertTrue(str.contains("This is a test 1"));
        Assert.assertTrue(str.contains("This is a test 2"));
        Assert.assertTrue(str.contains("This is a test 3"));
    }

    @Test(timeout = TEST_TIMEOUT)
    public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
        enableSslContextService(keyStoreSslContext);
        run(1, trustStoreSslContext);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        String str = new String(((MockFlowFile) flowFilesForRelationship.get(0)).toByteArray(), StandardCharsets.UTF_8);
        Assert.assertNotNull(str);
        Assert.assertTrue(str.contains("This is a test 1"));
        Assert.assertTrue(str.contains("This is a test 2"));
        Assert.assertTrue(str.contains("This is a test 3"));
    }

    protected void run(int i, SSLContext sSLContext) throws IOException, InterruptedException {
        int availablePort = NetworkUtils.availablePort();
        this.runner.setProperty(ListenTCPRecord.PORT, Integer.toString(availablePort));
        this.runner.run(1, false, true);
        new Thread(() -> {
            try {
                Socket socket = getSocket(availablePort, sSLContext);
                Throwable th = null;
                try {
                    try {
                        OutputStream outputStream = socket.getOutputStream();
                        outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
                        outputStream.flush();
                        if (socket != null) {
                            if (0 != 0) {
                                try {
                                    socket.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                socket.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (IOException e) {
                LOGGER.error("Failed Sending Records to Port [{}]", Integer.valueOf(availablePort), e);
            }
        }).start();
        int i2 = 0;
        while (getSuccessCount() < i) {
            this.runner.run(1, false, false);
            i2++;
            Assert.assertNull(this.runner.getLogger().getErrorMessages().stream().findFirst().orElse(null));
        }
        LOGGER.info("Completed after iterations [{}]", Integer.valueOf(i2));
    }

    private int getSuccessCount() {
        return this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
    }

    private Socket getSocket(int i, SSLContext sSLContext) throws IOException {
        return sSLContext == null ? new Socket(LOCALHOST, i) : sSLContext.getSocketFactory().createSocket(LOCALHOST, i);
    }

    private void enableSslContextService(SSLContext sSLContext) throws InitializationException {
        RestrictedSSLContextService restrictedSSLContextService = (RestrictedSSLContextService) Mockito.mock(RestrictedSSLContextService.class);
        Mockito.when(restrictedSSLContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER);
        Mockito.when(restrictedSSLContextService.createContext()).thenReturn(sSLContext);
        this.runner.addControllerService(SSL_CONTEXT_IDENTIFIER, restrictedSSLContextService);
        this.runner.enableControllerService(restrictedSSLContextService);
        this.runner.setProperty(ListenTCPRecord.SSL_CONTEXT_SERVICE, SSL_CONTEXT_IDENTIFIER);
    }
}
