package org.apache.nifi.processors.standard;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(30)
/* loaded from: input_file:org/apache/nifi/processors/standard/TestPutTCP.class */
public class TestPutTCP {
    private static final String TCP_SERVER_ADDRESS = "127.0.0.1";
    private static final String SERVER_VARIABLE = "server.address";
    private static final String TCP_SERVER_ADDRESS_EL = "${server.address}";
    private static final int MIN_INVALID_PORT = 0;
    private static final int MIN_VALID_PORT = 1;
    private static final int MAX_VALID_PORT = 65535;
    private static final int MAX_INVALID_PORT = 65536;
    private static final int VALID_LARGE_FILE_SIZE = 32768;
    private static final int VALID_SMALL_FILE_SIZE = 64;
    private static final int LOAD_TEST_ITERATIONS = 500;
    private static final int LOAD_TEST_THREAD_COUNT = 1;
    private static final int DEFAULT_ITERATIONS = 1;
    private static final int DEFAULT_THREAD_COUNT = 1;
    private static final char CONTENT_CHAR = 'x';
    private static final String OUTGOING_MESSAGE_DELIMITER = "\n";
    private static final String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
    private static final String[] EMPTY_FILE = {""};
    private static final String[] VALID_FILES = {"abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\"};
    private EventServer eventServer;
    private TestRunner runner;
    private BlockingQueue<ByteArrayMessage> messages;

    @BeforeEach
    public void setup() throws Exception {
        this.runner = TestRunners.newTestRunner(PutTCP.class);
        this.runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
    }

    @AfterEach
    public void cleanup() {
        this.runner.shutdown();
        shutdownServer();
    }

    @Test
    public void testRunSuccess() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessSslContextService() throws Exception {
        SSLContext createSslContext = SslContextUtils.createSslContext(new TemporaryKeyStoreBuilder().build());
        Assertions.assertNotNull(createSslContext, "SSLContext not found");
        String name = SSLContextService.class.getName();
        SSLContextService sSLContextService = (SSLContextService) Mockito.mock(SSLContextService.class);
        Mockito.when(sSLContextService.getIdentifier()).thenReturn(name);
        Mockito.when(sSLContextService.createContext()).thenReturn(createSslContext);
        this.runner.addControllerService(name, sSLContextService);
        this.runner.enableControllerService(sSLContextService);
        this.runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, name);
        createTestServer(createSslContext, OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessServerVariableExpression() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessPruneSenders() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(VALID_FILES);
        assertTransfers(VALID_FILES.length);
        assertMessagesReceived(VALID_FILES);
        this.runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
        this.runner.run(1, false, false);
        this.runner.clearTransferState();
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessMultiCharDelimiter() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessConnectionPerFlowFile() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessConnectionFailure() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
        shutdownServer();
        sendTestData(VALID_FILES);
        this.runner.assertQueueEmpty();
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(VALID_FILES);
        assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessEmptyFile() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        sendTestData(EMPTY_FILE);
        assertTransfers(1);
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testRunSuccessLargeValidFile() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
        String[] createContent = createContent(VALID_LARGE_FILE_SIZE);
        sendTestData(createContent);
        assertMessagesReceived(createContent);
    }

    @Test
    public void testRunSuccessFiveHundredMessages() throws Exception {
        createTestServer(OUTGOING_MESSAGE_DELIMITER);
        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        String[] createContent = createContent(VALID_SMALL_FILE_SIZE);
        sendTestData(createContent, LOAD_TEST_ITERATIONS, 1);
        assertMessagesReceived(createContent, LOAD_TEST_ITERATIONS);
    }

    private void createTestServer(String str) throws UnknownHostException {
        createTestServer(null, str);
    }

    private void createTestServer(SSLContext sSLContext, String str) throws UnknownHostException {
        this.messages = new LinkedBlockingQueue();
        ByteArrayMessageNettyEventServerFactory byteArrayMessageNettyEventServerFactory = new ByteArrayMessageNettyEventServerFactory(this.runner.getLogger(), InetAddress.getByName(TCP_SERVER_ADDRESS), MIN_INVALID_PORT, TransportProtocol.TCP, str.getBytes(), VALID_LARGE_FILE_SIZE, this.messages);
        if (sSLContext != null) {
            byteArrayMessageNettyEventServerFactory.setSslContext(sSLContext);
        }
        byteArrayMessageNettyEventServerFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        byteArrayMessageNettyEventServerFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        this.eventServer = byteArrayMessageNettyEventServerFactory.getEventServer();
    }

    private void shutdownServer() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
        }
    }

    private void configureProperties(String str, String str2, boolean z) {
        this.runner.setProperty(PutTCP.HOSTNAME, str);
        this.runner.setProperty(PutTCP.PORT, String.valueOf(this.eventServer.getListeningPort()));
        if (str2 != null) {
            this.runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, str2);
        }
        this.runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(z));
        this.runner.assertValid();
    }

    private void sendTestData(String[] strArr) {
        sendTestData(strArr, 1, 1);
    }

    private void sendTestData(String[] strArr, int i, int i2) {
        this.runner.setThreadCount(i2);
        int i3 = MIN_INVALID_PORT;
        while (i3 < i) {
            int length = strArr.length;
            for (int i4 = MIN_INVALID_PORT; i4 < length; i4++) {
                this.runner.enqueue(strArr[i4].getBytes());
            }
            this.runner.run(strArr.length, false, i3 == 0);
            i3++;
        }
    }

    private void assertTransfers(int i) {
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, i);
        this.runner.assertTransferCount(PutTCP.REL_FAILURE, MIN_INVALID_PORT);
    }

    private void assertMessagesReceived(String[] strArr) throws Exception {
        assertMessagesReceived(strArr, 1);
        this.runner.assertQueueEmpty();
    }

    private void assertMessagesReceived(String[] strArr, int i) throws Exception {
        for (int i2 = MIN_INVALID_PORT; i2 < i; i2++) {
            int length = strArr.length;
            for (int i3 = MIN_INVALID_PORT; i3 < length; i3++) {
                String str = strArr[i3];
                ByteArrayMessage take = this.messages.take();
                Assertions.assertNotNull(take, String.format("Message [%d] not found", Integer.valueOf(i2)));
                Assertions.assertTrue(Arrays.asList(strArr).contains(new String(take.getMessage())));
            }
        }
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, strArr.length * i);
        this.runner.clearTransferState();
        Assertions.assertNull(this.messages.poll(), "Unexpected extra messages found");
    }

    private String[] createContent(int i) {
        char[] cArr = new char[i];
        for (int i2 = MIN_INVALID_PORT; i2 < i; i2++) {
            cArr[i2] = 'x';
        }
        return new String[]{new String(cArr)};
    }
}
