package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.FileSplitter;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeoutException;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterTest.class */
public class FileSplitterTest {

    @Rule
    public TestMeta testMeta = new TestMeta();

    @ClassRule
    public static TestClassMeta classTestMeta = new TestClassMeta();
    private static final Logger LOG = LoggerFactory.getLogger(FileSplitterTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterTest$MockScanner.class */
    private static class MockScanner extends FileSplitter.TimeBasedDirectoryScanner {
        TestMeta testMeta;

        MockScanner(TestMeta testMeta) {
            this.testMeta = testMeta;
        }

        protected void scanComplete() {
            super.scanComplete();
            try {
                if (this.discoveredFiles.size() > 0 && ((FileSplitter.FileInfo) this.discoveredFiles.getLast()).lastFileOfScan) {
                    FileSplitterTest.LOG.debug("discovered {}", Integer.valueOf(this.discoveredFiles.size()));
                    this.testMeta.exchanger.exchange(Integer.valueOf(this.discoveredFiles.size()));
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterTest$TestClassMeta.class */
    public static class TestClassMeta extends TestWatcher {
        protected void finished(Description description) {
            try {
                FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        FileSplitter fileSplitter;
        CollectorTestSink<FileSplitter.FileMetadata> fileMetadataSink;
        CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
        Context.OperatorContext context;
        public String dataDirectory = null;
        Set<String> filePaths = Sets.newHashSet();
        Exchanger<Integer> exchanger = new Exchanger<>();

        protected void starting(Description description) {
            this.dataDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
            try {
                FileContext.getLocalFSFileContext().delete(new Path(new File(this.dataDirectory).getAbsolutePath()), true);
                HashSet newHashSet = Sets.newHashSet();
                for (int i = 0; i < 12; i++) {
                    HashSet newHashSet2 = Sets.newHashSet();
                    for (int i2 = 0; i2 < 2; i2++) {
                        newHashSet2.add("f" + i + "l" + i2);
                    }
                    newHashSet.addAll(newHashSet2);
                    File file = new File(this.dataDirectory, "file" + i + ".txt");
                    this.filePaths.add(new Path(this.dataDirectory, file.getName()).toUri().toString());
                    FileUtils.write(file, StringUtils.join(newHashSet2, '\n'));
                }
                this.fileSplitter = new FileSplitter();
                this.fileSplitter.setScanner(new MockScanner(this));
                this.fileSplitter.scanner.setScanIntervalMillis(500L);
                this.fileSplitter.scanner.setFilePatternRegularExp(".*[.]txt");
                this.fileSplitter.scanner.setFiles(this.dataDirectory);
                this.fileSplitter.setWindowDataManager(new FSWindowDataManager());
                Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.dataDirectory);
                this.context = OperatorContextTestHelper.mockOperatorContext(0, defaultAttributeMap);
                this.fileSplitter.setup(this.context);
                this.fileMetadataSink = new CollectorTestSink<>();
                TestUtils.setSink(this.fileSplitter.filesMetadataOutput, this.fileMetadataSink);
                this.blockMetadataSink = new CollectorTestSink<>();
                TestUtils.setSink(this.fileSplitter.blocksMetadataOutput, this.blockMetadataSink);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            this.fileSplitter.teardown();
        }
    }

    @Test
    public void testFileMetadata() throws InterruptedException {
        this.testMeta.fileSplitter.beginWindow(1L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("File metadata", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        for (FileSplitter.FileMetadata fileMetadata : this.testMeta.fileMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileMetadata.getFilePath(), this.testMeta.filePaths.contains(fileMetadata.getFilePath()));
            Assert.assertNotNull("name: ", fileMetadata.getFileName());
        }
        this.testMeta.fileMetadataSink.collectedTuples.clear();
    }

    @Test
    public void testBlockMetadataNoSplit() throws InterruptedException {
        this.testMeta.fileSplitter.beginWindow(1L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        Assert.assertEquals("Blocks", 12L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.testMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.testMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
    }

    @Test
    public void testBlockMetadataWithSplit() throws InterruptedException {
        this.testMeta.fileSplitter.setBlockSize(2L);
        this.testMeta.fileSplitter.beginWindow(1L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.testMeta.dataDirectory, ((FileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(i2)).getFileName()).length() / 2.0d);
        }
        Assert.assertEquals("Blocks", i, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testIdempotency() throws InterruptedException {
        this.testMeta.fileSplitter.setWindowDataManager(new FSWindowDataManager());
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        this.testMeta.fileSplitter.beginWindow(1L);
        Assert.assertEquals("Blocks", 12L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.testMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.testMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
    }

    @Test
    public void testTimeScan() throws InterruptedException, IOException, TimeoutException {
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory, "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitter.beginWindow(2L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("window 2: files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testTrigger() throws InterruptedException, IOException, TimeoutException {
        this.testMeta.fileSplitter.scanner.setScanIntervalMillis(60000L);
        testFileMetadata();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        Thread.sleep(1000L);
        File file = new File(this.testMeta.dataDirectory, "file13.txt");
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 2; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        this.testMeta.fileSplitter.scanner.setTrigger(true);
        this.testMeta.fileSplitter.beginWindow(2L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("window 2: files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("window 2: blocks", 1L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testBlocksThreshold() throws InterruptedException {
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.testMeta.dataDirectory, "file" + i2 + ".txt").length() / 2.0d);
        }
        this.testMeta.fileSplitter.setBlockSize(2L);
        this.testMeta.fileSplitter.setBlocksThreshold(10);
        this.testMeta.fileSplitter.beginWindow(1L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("Blocks", 10L, this.testMeta.blockMetadataSink.collectedTuples.size());
        for (int i3 = 2; i3 < 8; i3++) {
            this.testMeta.fileSplitter.beginWindow(i3);
            this.testMeta.fileSplitter.emitTuples();
            this.testMeta.fileSplitter.endWindow();
        }
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", i, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Test
    public void testIdempotencyWithBlocksThreshold() throws InterruptedException {
        this.testMeta.fileSplitter.setWindowDataManager(new FSWindowDataManager());
        this.testMeta.fileSplitter.setBlocksThreshold(10);
        this.testMeta.fileSplitter.scanner.setScanIntervalMillis(500L);
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        testBlocksThreshold();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        for (int i = 1; i < 8; i++) {
            this.testMeta.fileSplitter.beginWindow(i);
        }
        Assert.assertEquals("Files", 12L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 62L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Ignore
    public void testFirstWindowAfterRecovery() throws IOException, InterruptedException {
        testIdempotencyWithBlocksThreshold();
        Thread.sleep(1000L);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 2; i < 4; i++) {
            newHashSet.add("f13l" + i);
        }
        FileUtils.writeLines(new File(this.testMeta.dataDirectory, "file13.txt"), newHashSet, true);
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitter.beginWindow(8L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 6L, this.testMeta.blockMetadataSink.collectedTuples.size());
    }

    @Ignore
    public void testRecoveryOfPartialFile() throws InterruptedException {
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setStatePath(this.testMeta.dataDirectory + "/recovery");
        this.testMeta.fileSplitter.setWindowDataManager(fSWindowDataManager);
        this.testMeta.fileSplitter.setBlockSize(2L);
        this.testMeta.fileSplitter.setBlocksThreshold(2);
        this.testMeta.fileSplitter.scanner.setScanIntervalMillis(500L);
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        this.testMeta.fileSplitter.beginWindow(1L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        this.testMeta.fileSplitter.beginWindow(1L);
        Assert.assertEquals("Recovered Files", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Recovered Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        this.testMeta.fileSplitter.beginWindow(2L);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("Blocks", 4L, this.testMeta.blockMetadataSink.collectedTuples.size());
        String fileName = ((FileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFileName();
        this.testMeta.fileMetadataSink.clear();
        this.testMeta.blockMetadataSink.clear();
        this.testMeta.fileSplitter.beginWindow(3L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("New file", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", 2L, this.testMeta.blockMetadataSink.collectedTuples.size());
        String fileName2 = ((FileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFileName();
        Assert.assertTrue("Block file name 0", ((BlockMetadata.FileBlockMetadata) this.testMeta.blockMetadataSink.collectedTuples.get(0)).getFilePath().endsWith(fileName));
        Assert.assertTrue("Block file name 1", ((BlockMetadata.FileBlockMetadata) this.testMeta.blockMetadataSink.collectedTuples.get(1)).getFilePath().endsWith(fileName2));
    }

    @Test
    public void testSingleFile() throws InterruptedException, IOException {
        this.testMeta.fileSplitter.teardown();
        this.testMeta.fileSplitter.scanner = new MockScanner(this.testMeta);
        this.testMeta.fileSplitter.scanner.regex = null;
        this.testMeta.fileSplitter.scanner.setFiles(this.testMeta.dataDirectory + "/file1.txt");
        this.testMeta.fileSplitter.setup(this.testMeta.context);
        this.testMeta.fileSplitter.beginWindow(1L);
        this.testMeta.exchanger.exchange(null);
        this.testMeta.fileSplitter.emitTuples();
        this.testMeta.fileSplitter.endWindow();
        Assert.assertEquals("File metadata count", 1L, this.testMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("File metadata", new File(this.testMeta.dataDirectory + "/file1.txt").getAbsolutePath(), ((FileSplitter.FileMetadata) this.testMeta.fileMetadataSink.collectedTuples.get(0)).getFilePath());
    }
}
