package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.lib.io.fs.FileSplitterInput;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.lib.util.TestUtils;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInputTest.class */
public class FileSplitterInputTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInputTest$MockScanner.class */
    public static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner {
        private final transient Semaphore semaphore;

        private MockScanner() {
            this.semaphore = new Semaphore(0);
        }

        protected void processDiscoveredFile(FileSplitterInput.ScannedFileInfo scannedFileInfo) {
            super.processDiscoveredFile(scannedFileInfo);
            this.semaphore.release();
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInputTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        String dataDirectory;
        FileSplitterInput fileSplitterInput;
        CollectorTestSink<AbstractFileSplitter.FileMetadata> fileMetadataSink;
        CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
        Set<String> filePaths;
        Context.OperatorContext context;
        MockScanner scanner;

        protected void starting(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
            String methodName = description.getMethodName();
            String className = description.getClassName();
            this.dataDirectory = "target/" + className + "/" + methodName + "/data";
            try {
                this.filePaths = FileSplitterInputTest.createData(this.dataDirectory);
                this.fileSplitterInput = new FileSplitterInput();
                this.fileSplitterInput.setBlocksThreshold(100);
                this.scanner = new MockScanner();
                this.scanner.setScanIntervalMillis(100L);
                this.scanner.setFilePatternRegularExp(".*[.]txt");
                this.scanner.setFiles(this.dataDirectory);
                this.fileSplitterInput.setScanner(this.scanner);
                Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
                this.context = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
                this.fileMetadataSink = new CollectorTestSink<>();
                this.blockMetadataSink = new CollectorTestSink<>();
                resetSinks();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            this.filePaths.clear();
            TestUtils.deleteTargetTestClassFolder(description);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetSinks() {
            TestUtils.setSink(this.fileSplitterInput.filesMetadataOutput, this.fileMetadataSink);
            TestUtils.setSink(this.fileSplitterInput.blocksMetadataOutput, this.blockMetadataSink);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateConfig(FSWindowDataManager fSWindowDataManager, long j, long j2, int i) {
            this.fileSplitterInput.setWindowDataManager(fSWindowDataManager);
            this.fileSplitterInput.getScanner().setScanIntervalMillis(j);
            this.fileSplitterInput.setBlockSize(Long.valueOf(j2));
            this.fileSplitterInput.setBlocksThreshold(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Set<String> createData(String str) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        FileContext.getLocalFSFileContext().delete(new Path(new File(str).getAbsolutePath()), true);
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i = 0; i < 12; i++) {
            HashSet newHashSet3 = Sets.newHashSet();
            for (int i2 = 0; i2 < 2; i2++) {
                newHashSet3.add("f" + String.format("%02d", Integer.valueOf(i)) + "l" + i2);
            }
            newHashSet2.addAll(newHashSet3);
            File file = new File(str, "file" + i + ".txt");
            newHashSet.add(file.getAbsolutePath());
            FileUtils.write(file, StringUtils.join(newHashSet3, '\n'));
        }
        return newHashSet;
    }

    private void validateFileMetadataInWindow1() throws InterruptedException {
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        for (AbstractFileSplitter.FileMetadata fileMetadata : this.testMeta.fileMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileMetadata.getFilePath(), this.testMeta.filePaths.contains(fileMetadata.getFilePath()));
            Assert.assertNotNull("name: ", fileMetadata.getFileName());
        }
        this.testMeta.fileMetadataSink.collectedTuples.clear();
    }

    @Test
    public void testFileMetadata() throws InterruptedException {
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateFileMetadataInWindow1();
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testScannerFilterForDuplicates() throws InterruptedException {
        String str = this.testMeta.dataDirectory + "/file0.txt";
        this.testMeta.scanner = new MockScanner();
        this.testMeta.fileSplitterInput.setScanner(this.testMeta.scanner);
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500L);
        this.testMeta.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
        this.testMeta.fileSplitterInput.getScanner().setFiles(str);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        for (AbstractFileSplitter.FileMetadata fileMetadata : this.testMeta.fileMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileMetadata.getFilePath(), this.testMeta.filePaths.contains(fileMetadata.getFilePath()));
            Assert.assertNotNull("name: ", fileMetadata.getFileName());
        }
        this.testMeta.fileMetadataSink.collectedTuples.clear();
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testBlockMetadataNoSplit() throws InterruptedException {
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire(12);
        this.testMeta.fileSplitterInput.emitTuples();
        Assert.assertEquals("Blocks", 12L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.testMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.testMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testBlockMetadataWithSplit() throws InterruptedException {
        this.testMeta.fileSplitterInput.setBlockSize(2L);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire(12);
        this.testMeta.fileSplitterInput.emitTuples();
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.testMeta.dataDirectory, ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(i2)).getFileName()).length() / 2.0d);
        }
        Assert.assertEquals("Blocks", i, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testIdempotency() throws InterruptedException {
        this.testMeta.fileSplitterInput.setWindowDataManager(new FSWindowDataManager());
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateFileMetadataInWindow1();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.teardown();
        this.testMeta.fileSplitterInput = (FileSplitterInput) KryoCloneUtils.cloneObject(this.testMeta.fileSplitterInput);
        this.testMeta.resetSinks();
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        Assert.assertEquals("Blocks", 12L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.testMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.testMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testTimeScan() throws InterruptedException, IOException, TimeoutException {
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateFileMetadataInWindow1();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory, "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files " + this.testMeta.fileMetadataSink.collectedTuples, 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testTrigger() throws InterruptedException, IOException, TimeoutException {
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60000L);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateFileMetadataInWindow1();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory, "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitterInput.getScanner().setTrigger(true);
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    private int getTotalNumOfBlocks(int i, long j) {
        int i2 = 0;
        for (int i3 = 0; i3 < i; i3++) {
            i2 += (int) Math.ceil(new File(this.testMeta.dataDirectory, "file" + i3 + ".txt").length() / (j * 1.0d));
        }
        return i2;
    }

    private void validateBlocks(long j, long j2) throws InterruptedException {
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Blocks", 10L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (int i = 2; i <= j; i++) {
            this.testMeta.fileSplitterInput.beginWindow(i);
            this.testMeta.fileSplitterInput.emitTuples();
            this.testMeta.fileSplitterInput.endWindow();
        }
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, j2), this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testBlocksThreshold() throws InterruptedException {
        this.testMeta.fileSplitterInput.setBlockSize(2L);
        this.testMeta.fileSplitterInput.setBlocksThreshold(10);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateBlocks(8L, 2L);
        this.testMeta.fileSplitterInput.teardown();
    }

    private void validateRecovery(long j, long j2) throws InterruptedException {
        validateBlocks(j, j2);
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.teardown();
        this.testMeta.fileSplitterInput = (FileSplitterInput) KryoCloneUtils.cloneObject(this.testMeta.fileSplitterInput);
        this.testMeta.resetSinks();
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        for (int i = 1; i <= j; i++) {
            this.testMeta.fileSplitterInput.beginWindow(i);
        }
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, j2), this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testIdempotencyWithBlocksThreshold() throws InterruptedException {
        this.testMeta.updateConfig(new FSWindowDataManager(), 500L, 2L, 10);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateRecovery(8L, 2L);
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testFirstWindowAfterRecovery() throws IOException, InterruptedException {
        this.testMeta.updateConfig(new FSWindowDataManager(), 500L, 2L, 10);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateRecovery(8L, 2L);
        Thread.sleep(1000L);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 2; i < 4; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.writeLines(new File(this.testMeta.dataDirectory, "file13.txt"), newHashSet, true);
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.beginWindow(9L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Files " + this.testMeta.fileMetadataSink.collectedTuples, 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 6L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testRecoveryOfPartialFile() throws InterruptedException {
        this.testMeta.updateConfig(new FSWindowDataManager(), 500L, 2L, 2);
        FileSplitterInput fileSplitterInput = (FileSplitterInput) KryoCloneUtils.cloneObject(this.testMeta.fileSplitterInput);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire(12);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        AbstractFileSplitter.FileMetadata fileMetadata = (AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0);
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.teardown();
        this.testMeta.fileSplitterInput = fileSplitterInput;
        this.testMeta.resetSinks();
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        Assert.assertEquals("Recovered Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("recovered file-metadata", fileMetadata.getFileName(), ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFileName());
        Assert.assertEquals("Recovered Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.endWindow();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        this.testMeta.fileSplitterInput.beginWindow(3L);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File", 0L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 4L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.beginWindow(4L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire(11);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("New file", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        AbstractFileSplitter.FileMetadata fileMetadata2 = (AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0);
        Iterator it = this.testMeta.blockMetadataSink.collectedTuples.iterator();
        while (it.hasNext()) {
            Assert.assertTrue("Block file name", ((BlockMetadata.FileBlockMetadata) it.next()).getFilePath().endsWith(fileMetadata2.getFileName()));
            this.testMeta.fileSplitterInput.teardown();
        }
    }

    @Test
    public void testRecursive() throws InterruptedException, IOException {
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateFileMetadataInWindow1();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        File file = new File(this.testMeta.dataDirectory + "/child", "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire(2);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files " + this.testMeta.fileMetadataSink.collectedTuples, 2L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testSingleFile() throws InterruptedException, IOException {
        this.testMeta.fileSplitterInput.setScanner(new MockScanner());
        this.testMeta.fileSplitterInput.getScanner().setFiles(this.testMeta.dataDirectory + "/file1.txt");
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata count", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("File metadata", new File(this.testMeta.dataDirectory + "/file1.txt").getAbsolutePath(), ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFilePath());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testRecoveryOfBlockMetadataIterator() throws InterruptedException {
        this.testMeta.updateConfig(new FSWindowDataManager(), 500L, 2L, 2);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire(12);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.teardown();
        this.testMeta.fileSplitterInput = (FileSplitterInput) KryoCloneUtils.cloneObject(this.testMeta.fileSplitterInput);
        TestUtils.setSink(this.testMeta.fileSplitterInput.blocksMetadataOutput, this.testMeta.blockMetadataSink);
        TestUtils.setSink(this.testMeta.fileSplitterInput.filesMetadataOutput, this.testMeta.fileMetadataSink);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        Assert.assertEquals("Recovered Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Recovered Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException {
        File file = new File(this.testMeta.dataDirectory, "file11.txt");
        LOG.debug("file 11 modified time {} ", Long.valueOf(file.lastModified()));
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60000L);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        validateFileMetadataInWindow1();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f11l" + i);
        }
        FileWriter fileWriter = new FileWriter(file, true);
        fileWriter.write(StringUtils.join(newHashSet, '\n').toCharArray());
        fileWriter.close();
        LOG.debug("file 11 modified time after append {} ", Long.valueOf(file.lastModified()));
        this.testMeta.fileSplitterInput.getScanner().setTrigger(true);
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.scanner.setTrigger(true);
        this.testMeta.fileSplitterInput.beginWindow(3L);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files", 0L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 0L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testMultipleNestedInput() throws IOException, InterruptedException {
        File file = new File(this.testMeta.dataDirectory, "subDir");
        file.mkdir();
        File file2 = new File(file, "file.txt");
        FileWriter fileWriter = new FileWriter(file2, true);
        fileWriter.write(StringUtils.join(new Serializable[]{"testData", '\n'}).toCharArray());
        fileWriter.close();
        String concat = this.testMeta.scanner.getFiles().concat("," + file.getAbsolutePath());
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.testMeta.filePaths);
        arrayList.add(file.getAbsolutePath());
        arrayList.add(file2.getAbsolutePath());
        arrayList.add(file2.getAbsolutePath());
        this.testMeta.fileSplitterInput.setScanner(new MockScanner());
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60000L);
        this.testMeta.fileSplitterInput.getScanner().setFiles(concat);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire(15);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata", 15L, this.testMeta.fileMetadataSink.collectedTuples.size());
        for (AbstractFileSplitter.FileMetadata fileMetadata : this.testMeta.fileMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileMetadata.getFilePath(), arrayList.contains(fileMetadata.getFilePath()));
            Assert.assertNotNull("name: ", fileMetadata.getFileName());
        }
        this.testMeta.fileMetadataSink.collectedTuples.clear();
        this.testMeta.fileSplitterInput.teardown();
    }

    @Test
    public void testEmptyDirCopy() throws InterruptedException {
        File file = new File(this.testMeta.dataDirectory, "emptyDir");
        file.mkdirs();
        this.testMeta.fileSplitterInput.setScanner(new MockScanner());
        this.testMeta.fileSplitterInput.getScanner().regex = null;
        this.testMeta.fileSplitterInput.getScanner().setFiles(this.testMeta.dataDirectory + "/emptyDir");
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata count", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Empty directory not copied.", file.getName(), ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFileName());
        this.testMeta.fileSplitterInput.teardown();
    }
}
