package org.apache.nifi.processors.standard.util;

import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import javax.net.ServerSocketFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.processors.standard.PutTCP;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/nifi/processors/standard/util/TestPutTCPCommon.class */
public abstract class TestPutTCPCommon {
    private static final String TCP_SERVER_ADDRESS = "127.0.0.1";
    private static final String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
    private static final String TCP_SERVER_ADDRESS_EL = "${ALKJAFLKJDFLSKJSDFLKJSDF}";
    private static final String UNKNOWN_HOST = "fgdsfgsdffd";
    private static final String INVALID_IP_ADDRESS = "300.300.300.300";
    private static final int MIN_INVALID_PORT = 0;
    private static final int MIN_VALID_PORT = 1;
    private static final int MAX_VALID_PORT = 65535;
    private static final int MAX_INVALID_PORT = 65536;
    private static final int BUFFER_SIZE = 1024;
    private static final int VALID_LARGE_FILE_SIZE = 32768;
    private static final int VALID_SMALL_FILE_SIZE = 64;
    private static final int LOAD_TEST_ITERATIONS = 500;
    private static final int LOAD_TEST_THREAD_COUNT = 1;
    private static final int DEFAULT_ITERATIONS = 1;
    private static final int DEFAULT_THREAD_COUNT = 1;
    private static final char CONTENT_CHAR = 'x';
    private static final int DATA_WAIT_PERIOD = 1000;
    private static final int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
    private static final int LONG_TEST_TIMEOUT_PERIOD = 180000;
    private static final String OUTGOING_MESSAGE_DELIMITER = "\n";
    private static final String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
    private TCPTestServer server;
    private int tcp_server_port;
    private ArrayBlockingQueue<List<Byte>> recvQueue;
    public ServerSocketFactory serverSocketFactory;
    public TestRunner runner;
    private static final String[] EMPTY_FILE = {""};
    private static final String[] VALID_FILES = {"abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\"};

    @BeforeClass
    public static void setUpSuite() {
        Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
    }

    @Before
    public void setup() throws Exception {
        this.recvQueue = new ArrayBlockingQueue<>(BUFFER_SIZE);
        this.runner = TestRunners.newTestRunner(PutTCP.class);
        this.runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
    }

    private synchronized TCPTestServer createTestServer(String str, ArrayBlockingQueue<List<Byte>> arrayBlockingQueue, String str2) throws Exception {
        TCPTestServer tCPTestServer = new TCPTestServer(InetAddress.getByName(str), arrayBlockingQueue, str2);
        tCPTestServer.startServer(this.serverSocketFactory);
        this.tcp_server_port = tCPTestServer.getPort();
        return tCPTestServer;
    }

    @After
    public void cleanup() throws Exception {
        this.runner.shutdown();
        removeTestServer(this.server);
    }

    private void removeTestServer(TCPTestServer tCPTestServer) {
        if (tCPTestServer != null) {
            tCPTestServer.shutdown();
        }
    }

    @Test(timeout = 10000)
    public void testValidFiles() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
    }

    @Test(timeout = 10000)
    public void testValidFilesEL() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS_EL, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
    }

    @Test(timeout = 10000)
    public void testPruneSenders() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        Thread.sleep(10L);
        checkRelationships(VALID_FILES.length, MIN_INVALID_PORT);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
        this.runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
        Thread.sleep(1000L);
        this.runner.run(1, false, false);
        this.runner.clearTransferState();
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 2);
    }

    @Test(timeout = 10000)
    public void testMultiCharDelimiter() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
    }

    @Test(timeout = 10000)
    public void testConnectionPerFlowFile() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", true, true);
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, VALID_FILES.length);
    }

    @Test(timeout = 10000)
    public void testConnectionFailure() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
        removeTestServer(this.server);
        this.runner.clearTransferState();
        sendTestData(VALID_FILES);
        Thread.sleep(10L);
        checkNoDataReceived(this.recvQueue);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        checkReceivedAllData(this.recvQueue, VALID_FILES);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
    }

    @Test(timeout = 10000)
    public void testEmptyFile() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(EMPTY_FILE);
        Thread.sleep(10L);
        checkRelationships(EMPTY_FILE.length, MIN_INVALID_PORT);
        checkEmptyMessageReceived(this.recvQueue);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
    }

    @Test(timeout = 10000)
    public void testlargeValidFile() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", true, true);
        String[] createContent = createContent(VALID_LARGE_FILE_SIZE);
        sendTestData(createContent);
        checkReceivedAllData(this.recvQueue, createContent);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, createContent.length);
    }

    @Test(timeout = 180000)
    @Ignore("This test is failing intermittently as documented in NIFI-4288")
    public void testInvalidIPAddress() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(INVALID_IP_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        Thread.sleep(10L);
        checkRelationships(MIN_INVALID_PORT, VALID_FILES.length);
        checkNoDataReceived(this.recvQueue);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, MIN_INVALID_PORT);
    }

    @Test(timeout = 180000)
    @Ignore("This test is failing intermittently as documented in NIFI-4288")
    public void testUnknownHostname() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        configureProperties(UNKNOWN_HOST, this.tcp_server_port, "\n", false, true);
        sendTestData(VALID_FILES);
        Thread.sleep(10L);
        checkRelationships(MIN_INVALID_PORT, VALID_FILES.length);
        checkNoDataReceived(this.recvQueue);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, MIN_INVALID_PORT);
    }

    @Test(timeout = 10000)
    public void testInvalidPort() throws Exception {
        configureProperties(UNKNOWN_HOST, MIN_INVALID_PORT, "\n", false, false);
        configureProperties(UNKNOWN_HOST, 1, "\n", false, true);
        configureProperties(UNKNOWN_HOST, MAX_VALID_PORT, "\n", false, true);
        configureProperties(UNKNOWN_HOST, MAX_INVALID_PORT, "\n", false, false);
    }

    @Test(timeout = 180000)
    public void testLoadTest() throws Exception {
        this.server = createTestServer(TCP_SERVER_ADDRESS, this.recvQueue, "\n");
        Thread.sleep(1000L);
        String[] createContent = createContent(VALID_SMALL_FILE_SIZE);
        configureProperties(TCP_SERVER_ADDRESS, this.tcp_server_port, "\n", false, true);
        sendTestData(createContent, LOAD_TEST_ITERATIONS, 1);
        checkReceivedAllData(this.recvQueue, createContent, LOAD_TEST_ITERATIONS);
        checkInputQueueIsEmpty();
        checkTotalNumConnections(this.server, 1);
    }

    private void checkTotalNumConnections(TCPTestServer tCPTestServer, int i) {
        Assert.assertEquals(i, tCPTestServer.getTotalNumConnections());
    }

    public abstract void configureProperties(String str, int i, String str2, boolean z, boolean z2) throws InitializationException;

    private void sendTestData(String[] strArr) {
        sendTestData(strArr, 1, 1);
    }

    private void sendTestData(String[] strArr, int i, int i2) {
        this.runner.setThreadCount(i2);
        int i3 = MIN_INVALID_PORT;
        while (i3 < i) {
            int length = strArr.length;
            for (int i4 = MIN_INVALID_PORT; i4 < length; i4++) {
                this.runner.enqueue(strArr[i4].getBytes());
            }
            this.runner.run(strArr.length, false, i3 == 0);
            i3++;
        }
    }

    private void checkRelationships(int i, int i2) {
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, i);
        this.runner.assertTransferCount(PutTCP.REL_FAILURE, i2);
    }

    private void checkNoDataReceived(ArrayBlockingQueue<List<Byte>> arrayBlockingQueue) throws Exception {
        Thread.sleep(1000L);
        Assert.assertNull(arrayBlockingQueue.poll());
    }

    private void checkEmptyMessageReceived(ArrayBlockingQueue<List<Byte>> arrayBlockingQueue) throws Exception {
        Thread.sleep(1000L);
        Assert.assertEquals(0L, arrayBlockingQueue.poll().size());
    }

    private void checkInputQueueIsEmpty() {
        this.runner.assertQueueEmpty();
    }

    private void checkReceivedAllData(ArrayBlockingQueue<List<Byte>> arrayBlockingQueue, String[] strArr) throws Exception {
        checkReceivedAllData(arrayBlockingQueue, strArr, 1);
    }

    private void checkReceivedAllData(ArrayBlockingQueue<List<Byte>> arrayBlockingQueue, String[] strArr, int i) throws Exception {
        for (int i2 = MIN_INVALID_PORT; i2 < i; i2++) {
            int length = strArr.length;
            for (int i3 = MIN_INVALID_PORT; i3 < length; i3++) {
                String str = strArr[i3];
                List<Byte> take = arrayBlockingQueue.take();
                Assert.assertNotNull(take);
                Assert.assertArrayEquals(str.getBytes(), ArrayUtils.toPrimitive((Byte[]) take.toArray(new Byte[take.size()])));
            }
        }
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, strArr.length * i);
        this.runner.clearTransferState();
        Assert.assertNull(arrayBlockingQueue.poll());
    }

    private String[] createContent(int i) {
        char[] cArr = new char[i];
        for (int i2 = MIN_INVALID_PORT; i2 < i; i2++) {
            cArr[i2] = 'x';
        }
        return new String[]{new String(cArr)};
    }
}
