package org.apache.nifi.processors.standard;

import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestListSFTP.class */
public class TestListSFTP {
    int port;

    @Rule
    public final FakeSftpServerRule sftpServer = new FakeSftpServerRule();
    final String username = "nifi-sftp-user";
    final String password = "Test test test chocolate";

    @Before
    public void setUp() throws Exception {
        this.sftpServer.addUser("nifi-sftp-user", "Test test test chocolate");
        this.port = this.sftpServer.getPort();
        this.sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8);
        this.sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8);
        byte[] bArr = new byte[120];
        SecureRandom.getInstanceStrong().nextBytes(bArr);
        this.sftpServer.putFile("/directory/file.bin", bArr);
    }

    @After
    public void tearDown() throws Exception {
        this.sftpServer.deleteAllFilesAndDirectories();
    }

    @Test
    public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        TestRunner newTestRunner = TestRunners.newTestRunner(new ListSFTP() { // from class: org.apache.nifi.processors.standard.TestListSFTP.1
            protected FileTransfer getFileTransfer(ProcessContext processContext) {
                return new SFTPTransfer(processContext, getLogger()) { // from class: org.apache.nifi.processors.standard.TestListSFTP.1.1
                    protected void getListing(String str, int i, int i2, List<FileInfo> list) throws IOException {
                        if (str.contains("subdir")) {
                            countDownLatch2.countDown();
                            try {
                                countDownLatch3.await();
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        super.getListing(str, i, i2, list);
                    }
                };
            }
        });
        newTestRunner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue());
        newTestRunner.setProperty(ListSFTP.HOSTNAME, "localhost");
        newTestRunner.setProperty(ListSFTP.USERNAME, "nifi-sftp-user");
        newTestRunner.setProperty(SFTPTransfer.PASSWORD, "Test test test chocolate");
        newTestRunner.setProperty(FTPTransfer.PORT, Integer.toString(this.port));
        newTestRunner.setProperty(ListSFTP.REMOTE_PATH, "/base/");
        newTestRunner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true");
        newTestRunner.assertValid();
        ExecutorService executorService = null;
        try {
            executorService = Executors.newFixedThreadPool(1);
            this.sftpServer.createDirectory("/base");
            uploadFile("/base/", Integer.valueOf(atomicInteger.getAndIncrement()), arrayList);
            uploadFile("/base/subdir/", "sub." + atomicInteger.getAndIncrement(), arrayList);
            executorService.submit(() -> {
                try {
                    newTestRunner.run(1, false);
                } finally {
                    countDownLatch.countDown();
                }
            });
            countDownLatch2.await();
            uploadFile("/base/", Integer.valueOf(atomicInteger.getAndIncrement()), arrayList);
            Thread.sleep(1100L);
            uploadFile("/base/subdir/", "sub." + atomicInteger.getAndIncrement(), arrayList);
            countDownLatch3.countDown();
            Thread.sleep(1100L);
            countDownLatch.await();
            newTestRunner.run();
            List list = (List) newTestRunner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).stream().map((v0) -> {
                return v0.getAttributes();
            }).map(map -> {
                return (String) map.get("filename");
            }).sorted().collect(Collectors.toList());
            Collections.sort(arrayList);
            Assert.assertEquals(arrayList, list);
            if (executorService != null) {
                executorService.shutdown();
            }
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdown();
            }
            throw th;
        }
    }

    private void uploadFile(String str, Object obj, List<String> list) throws Exception {
        String str2 = "file." + obj;
        this.sftpServer.putFile(str + str2, "unimportant", StandardCharsets.UTF_8);
        list.add(str2);
    }

    @Test
    public void basicFileList() throws InterruptedException {
        TestRunner newTestRunner = TestRunners.newTestRunner(ListSFTP.class);
        newTestRunner.setProperty(ListSFTP.HOSTNAME, "localhost");
        newTestRunner.setProperty(ListSFTP.USERNAME, "nifi-sftp-user");
        newTestRunner.setProperty(SFTPTransfer.PASSWORD, "Test test test chocolate");
        newTestRunner.setProperty(FTPTransfer.PORT, Integer.toString(this.port));
        newTestRunner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
        newTestRunner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
        newTestRunner.assertValid();
        Thread.sleep(((Long) AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS)).longValue() * 2);
        newTestRunner.run();
        newTestRunner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
        newTestRunner.assertAllFlowFilesContainAttribute("sftp.remote.host");
        newTestRunner.assertAllFlowFilesContainAttribute("sftp.remote.port");
        newTestRunner.assertAllFlowFilesContainAttribute("sftp.listing.user");
        newTestRunner.assertAllFlowFilesContainAttribute("file.owner");
        newTestRunner.assertAllFlowFilesContainAttribute("file.group");
        newTestRunner.assertAllFlowFilesContainAttribute("file.permissions");
        newTestRunner.assertAllFlowFilesContainAttribute("file.size");
        newTestRunner.assertAllFlowFilesContainAttribute("file.lastModifiedTime");
        newTestRunner.assertAllFlowFilesContainAttribute("filename");
        ((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0)).assertAttributeEquals("sftp.listing.user", "nifi-sftp-user");
    }

    @Test
    public void sizeFilteredFileList() throws InterruptedException {
        TestRunner newTestRunner = TestRunners.newTestRunner(ListSFTP.class);
        newTestRunner.setProperty(ListSFTP.HOSTNAME, "localhost");
        newTestRunner.setProperty(ListSFTP.USERNAME, "nifi-sftp-user");
        newTestRunner.setProperty(SFTPTransfer.PASSWORD, "Test test test chocolate");
        newTestRunner.setProperty(FTPTransfer.PORT, Integer.toString(this.port));
        newTestRunner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
        newTestRunner.setProperty(ListFile.MIN_SIZE, "8B");
        newTestRunner.setProperty(ListFile.MAX_SIZE, "100B");
        newTestRunner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
        newTestRunner.assertValid();
        Thread.sleep(((Long) AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS)).longValue() * 2);
        newTestRunner.run();
        newTestRunner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
        ((MockFlowFile) newTestRunner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0)).assertAttributeEquals("filename", "file.txt");
    }
}
