package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.lib.io.fs.FileSplitterInput;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInputTest.class */
public class FileSplitterInputTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInputTest$MockScanner.class */
    public static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner {
        transient Semaphore semaphore;

        private MockScanner() {
            this.semaphore = new Semaphore(0);
        }

        protected void scanIterationComplete() {
            if (getNumDiscoveredPerIteration() > 0) {
                this.semaphore.release();
            }
            super.scanIterationComplete();
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInputTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        String dataDirectory;
        FileSplitterInput fileSplitterInput;
        CollectorTestSink<AbstractFileSplitter.FileMetadata> fileMetadataSink;
        CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
        Set<String> filePaths;
        Context.OperatorContext context;
        MockScanner scanner;

        protected void starting(Description description) {
            String methodName = description.getMethodName();
            String className = description.getClassName();
            this.dataDirectory = "target/" + className + "/" + methodName + "/data";
            try {
                this.filePaths = FileSplitterInputTest.createData(this.dataDirectory);
                this.fileSplitterInput = new FileSplitterInput();
                this.scanner = new MockScanner();
                this.fileSplitterInput.setScanner(this.scanner);
                this.fileSplitterInput.getScanner().setScanIntervalMillis(500L);
                this.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
                this.fileSplitterInput.getScanner().setFiles(this.dataDirectory);
                this.fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
                Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
                this.context = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
                this.fileSplitterInput.setup(this.context);
                this.fileMetadataSink = new CollectorTestSink<>();
                TestUtils.setSink(this.fileSplitterInput.filesMetadataOutput, this.fileMetadataSink);
                this.blockMetadataSink = new CollectorTestSink<>();
                TestUtils.setSink(this.fileSplitterInput.blocksMetadataOutput, this.blockMetadataSink);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            this.fileSplitterInput.teardown();
            FileUtils.deleteQuietly(new File("target/" + description.getClassName() + "/" + description.getMethodName()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Set<String> createData(String str) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        FileContext.getLocalFSFileContext().delete(new Path(new File(str).getAbsolutePath()), true);
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i = 0; i < 12; i++) {
            HashSet newHashSet3 = Sets.newHashSet();
            for (int i2 = 0; i2 < 2; i2++) {
                newHashSet3.add("f" + i + "l" + i2);
            }
            newHashSet2.addAll(newHashSet3);
            File file = new File(str, "file" + i + ".txt");
            newHashSet.add(new Path(str, file.getName()).toUri().toString());
            FileUtils.write(file, StringUtils.join(newHashSet3, '\n'));
        }
        return newHashSet;
    }

    @Test
    public void testFileMetadata() throws InterruptedException {
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        for (AbstractFileSplitter.FileMetadata fileMetadata : this.testMeta.fileMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileMetadata.getFilePath(), this.testMeta.filePaths.contains(fileMetadata.getFilePath()));
            Assert.assertNotNull("name: ", fileMetadata.getFileName());
        }
        this.testMeta.fileMetadataSink.collectedTuples.clear();
    }

    @Test
    public void testBlockMetadataNoSplit() throws InterruptedException {
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        Assert.assertEquals("Blocks", 12L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.testMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.testMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
    }

    @Test
    public void testBlockMetadataWithSplit() throws InterruptedException {
        this.testMeta.fileSplitterInput.setBlockSize(2L);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.testMeta.dataDirectory, ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(i2)).getFileName()).length() / 2.0d);
        }
        Assert.assertEquals("Blocks", i, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testIdempotency() throws InterruptedException {
        this.testMeta.fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        Assert.assertEquals("Blocks", 12L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.testMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.testMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
    }

    @Test
    public void testTimeScan() throws InterruptedException, IOException, TimeoutException {
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory, "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testTrigger() throws InterruptedException, IOException, TimeoutException {
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60000L);
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory, "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitterInput.getScanner().setTrigger(true);
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testBlocksThreshold() throws InterruptedException {
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.testMeta.dataDirectory, "file" + i2 + ".txt").length() / 2.0d);
        }
        this.testMeta.fileSplitterInput.setBlockSize(2L);
        this.testMeta.fileSplitterInput.setBlocksThreshold(10);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Blocks", 10L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (int i3 = 2; i3 < 8; i3++) {
            this.testMeta.fileSplitterInput.beginWindow(i3);
            this.testMeta.fileSplitterInput.emitTuples();
            this.testMeta.fileSplitterInput.endWindow();
        }
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", i, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testIdempotencyWithBlocksThreshold() throws InterruptedException {
        this.testMeta.fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
        this.testMeta.fileSplitterInput.setBlocksThreshold(10);
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500L);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        testBlocksThreshold();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        for (int i = 1; i < 8; i++) {
            this.testMeta.fileSplitterInput.beginWindow(i);
        }
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 62L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testFirstWindowAfterRecovery() throws IOException, InterruptedException {
        testIdempotencyWithBlocksThreshold();
        Thread.sleep(1000L);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 2; i < 4; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.writeLines(new File(this.testMeta.dataDirectory, "file13.txt"), newHashSet, true);
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.beginWindow(8L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 6L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Ignore
    public void testRecoveryOfPartialFile() throws InterruptedException {
        this.testMeta.fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
        this.testMeta.fileSplitterInput.setBlockSize(2L);
        this.testMeta.fileSplitterInput.setBlocksThreshold(2);
        this.testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500L);
        Kryo kryo = new Kryo();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        kryo.writeObject(output, this.testMeta.fileSplitterInput);
        output.close();
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.teardown();
        Input input = new Input(byteArrayOutputStream.toByteArray());
        this.testMeta.fileSplitterInput = (FileSplitterInput) kryo.readObject(input, this.testMeta.fileSplitterInput.getClass());
        input.close();
        TestUtils.setSink(this.testMeta.fileSplitterInput.blocksMetadataOutput, this.testMeta.blockMetadataSink);
        TestUtils.setSink(this.testMeta.fileSplitterInput.filesMetadataOutput, this.testMeta.fileMetadataSink);
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        Assert.assertEquals("Recovered Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Recovered Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("Blocks", 4L, this.testMeta.blockMetadataSink.collectedTuples.size());
        String fileName = ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFileName();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitterInput.beginWindow(3L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("New file", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        String fileName2 = ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFileName();
        Assert.assertTrue("Block file name 0", ((BlockMetadata.FileBlockMetadata) this.testMeta.blockMetadataSink.collectedTuples.get(0)).getFilePath().endsWith(fileName));
        Assert.assertTrue("Block file name 1", ((BlockMetadata.FileBlockMetadata) this.testMeta.blockMetadataSink.collectedTuples.get(1)).getFilePath().endsWith(fileName2));
    }

    @Test
    public void testRecursive() throws InterruptedException, IOException {
        this.testMeta.fileSplitterInput.getScanner().regex = null;
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory + "/child", "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitterInput.beginWindow(2L);
        this.testMeta.scanner.semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("window 2: files", 2L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testSingleFile() throws InterruptedException, IOException {
        this.testMeta.fileSplitterInput.teardown();
        this.testMeta.fileSplitterInput.setScanner(new MockScanner());
        this.testMeta.fileSplitterInput.getScanner().regex = null;
        this.testMeta.fileSplitterInput.getScanner().setFiles(this.testMeta.dataDirectory + "/file1.txt");
        this.testMeta.fileSplitterInput.setup(this.testMeta.context);
        this.testMeta.fileSplitterInput.beginWindow(1L);
        ((MockScanner) this.testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
        this.testMeta.fileSplitterInput.emitTuples();
        this.testMeta.fileSplitterInput.endWindow();
        Assert.assertEquals("File metadata count", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("File metadata", new File(this.testMeta.dataDirectory + "/file1.txt").getAbsolutePath(), ((AbstractFileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFilePath());
    }
}
