package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

/* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.class */
public class AbstractFileInputOperatorTest {

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest$DirectoryScannerNew.class */
    public static class DirectoryScannerNew extends AbstractFileInputOperator.DirectoryScanner {
        private DirectoryScannerNew() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public LinkedHashSet<Path> scan(FileSystem fileSystem, Path path, Set<String> set) {
            LinkedHashSet<Path> scan = super.scan(fileSystem, path, set);
            TreeSet treeSet = new TreeSet();
            treeSet.addAll(scan);
            scan.clear();
            Iterator it = treeSet.iterator();
            while (it.hasNext()) {
                scan.add(it.next());
            }
            return scan;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest$IdempotencyTestDriver.class */
    public interface IdempotencyTestDriver<T extends Operator> {
        void writeFile(int i, String str) throws IOException;

        void setSink(T t, Sink<?> sink);

        String getDirectory();

        Context.OperatorContext getContext();
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest$LineOperator.class */
    public static class LineOperator extends LineByLineFileInputOperator {
        Set<String> dirPaths = Sets.newHashSet();

        protected void visitDirectory(Path path) {
            this.dirPaths.add(Path.getPathWithoutSchemeAndAuthority(path).toString());
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest$MyScanner.class */
    static class MyScanner extends AbstractFileInputOperator.DirectoryScanner {
        MyScanner() {
        }

        protected int getPartition(String str) {
            String[] split = str.split("/");
            try {
                return Integer.parseInt(split[split.length - 1].split("_")[0]);
            } catch (NumberFormatException e) {
                return super.getPartition(str);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        public String dir = null;
        Context.OperatorContext context;

        protected void starting(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
            this.dir = "target/" + description.getClassName() + "/" + description.getMethodName();
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.dir);
            this.context = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap);
        }

        protected void finished(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
        }
    }

    @Test
    public void testSinglePartitonRecursive() throws Exception {
        checkSubDir(true);
    }

    @Test
    public void testSinglePartiton() throws Exception {
        checkSubDir(false);
    }

    private void checkSubDir(boolean z) throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        HashSet newHashSet = Sets.newHashSet();
        String str = "";
        for (int i = 0; i < 2; i++) {
            str = str + String.format("/depth_%d", Integer.valueOf(i));
            HashSet newHashSet2 = Sets.newHashSet();
            for (int i2 = 0; i2 < 2; i2++) {
                newHashSet2.add("f" + i + "l" + i2);
            }
            newHashSet.addAll(newHashSet2);
            FileUtils.write(new File(this.testMeta.dir + str, "file" + i), StringUtils.join(newHashSet2, '\n'));
        }
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp("((?!target).)*file[\\d]");
        lineByLineFileInputOperator.getScanner().setRecursive(z);
        lineByLineFileInputOperator.setup(this.testMeta.context);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            lineByLineFileInputOperator.beginWindow(j2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            j = j2 + 1;
        }
        lineByLineFileInputOperator.teardown();
        int i3 = 4;
        if (!z) {
            newHashSet = new HashSet();
            i3 = 0;
        }
        Assert.assertEquals("number tuples", i3, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("lines", newHashSet, new HashSet(collectorTestSink.collectedTuples));
    }

    @Test
    public void testEmptyDirectory() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new File(this.testMeta.dir).getCanonicalPath());
        newHashSet.add(new File(this.testMeta.dir + "/a").getCanonicalPath());
        FileUtils.forceMkdir(new File(this.testMeta.dir + "/a"));
        newHashSet.add(new File(this.testMeta.dir + "/b").getCanonicalPath());
        FileUtils.forceMkdir(new File(this.testMeta.dir + "/b"));
        String str = "/b/c";
        newHashSet.add(new File(this.testMeta.dir + str).getCanonicalPath());
        FileUtils.forceMkdir(new File(this.testMeta.dir + str));
        ArrayList newArrayList = Lists.newArrayList();
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i = 0; i < 5; i++) {
            newHashSet2.add("f0l" + i);
        }
        newArrayList.addAll(newHashSet2);
        File file = new File(this.testMeta.dir + "/d", "file0");
        newHashSet.add(new File(this.testMeta.dir + "/d").getCanonicalPath());
        FileUtils.write(file, StringUtils.join(newHashSet2, '\n'));
        LineOperator lineOperator = new LineOperator();
        lineOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        lineOperator.setScanIntervalMillis(0);
        lineOperator.output.setSink(new CollectorTestSink());
        int i2 = 0;
        lineOperator.setup(this.testMeta.context);
        for (int i3 = 0; i3 < 3; i3++) {
            lineOperator.beginWindow(i2);
            lineOperator.emitTuples();
            lineOperator.endWindow();
            i2++;
        }
        Assert.assertEquals("Size", 5L, lineOperator.dirPaths.size());
        Assert.assertTrue("Checking Sets", newHashSet.equals(lineOperator.dirPaths));
    }

    @Test
    public void testScannerPartitioning() throws Exception {
        AbstractFileInputOperator.DirectoryScanner directoryScanner = new AbstractFileInputOperator.DirectoryScanner();
        directoryScanner.setFilePatternRegexp(".*partition([\\d]*)");
        Path path = new Path(new File(this.testMeta.dir).getAbsolutePath());
        FileContext.getLocalFSFileContext().delete(path, true);
        for (int i = 0; i < 4; i++) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "");
        }
        FileSystem fileSystem = FileSystem.get(FileContext.getLocalFSFileContext().getDefaultFileSystem().getUri(), new Configuration());
        List partition = directoryScanner.partition(2);
        HashSet newHashSet = Sets.newHashSet();
        Iterator it = partition.iterator();
        while (it.hasNext()) {
            LinkedHashSet scan = ((AbstractFileInputOperator.DirectoryScanner) it.next()).scan(fileSystem, path, Sets.newHashSet());
            Assert.assertEquals("", 3L, scan.size());
            newHashSet.addAll(scan);
        }
        Assert.assertEquals("Found all files " + newHashSet, 5L, newHashSet.size());
    }

    @Test
    public void testPartitioning() throws Exception {
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
        lineByLineFileInputOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        Path path = new Path(new File(this.testMeta.dir).getAbsolutePath());
        FileContext.getLocalFSFileContext().delete(path, true);
        for (int i = 0; i < 4; i++) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "");
        }
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new DefaultPartition(lineByLineFileInputOperator));
        Collection<Partitioner.Partition> definePartitions = lineByLineFileInputOperator.definePartitions(newArrayList, new StatelessPartitionerTest.PartitioningContextImpl(null, 2));
        Assert.assertEquals(2L, definePartitions.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator.getCurrentPartitions());
        for (Partitioner.Partition partition : definePartitions) {
            Assert.assertNotSame(lineByLineFileInputOperator, partition.getPartitionedInstance());
            Assert.assertNotSame(lineByLineFileInputOperator.getScanner(), ((AbstractFileInputOperator) partition.getPartitionedInstance()).getScanner());
            Assert.assertEquals("partition " + ((AbstractFileInputOperator) partition.getPartitionedInstance()).getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, Sets.newHashSet()), 3L, r0.size());
        }
    }

    @Test
    public void testPartitioningStateTransfer() throws Exception {
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
        lineByLineFileInputOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        lineByLineFileInputOperator.setScanIntervalMillis(0);
        LineByLineFileInputOperator lineByLineFileInputOperator2 = (LineByLineFileInputOperator) new Kryo().copy(lineByLineFileInputOperator);
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        int i = 0;
        while (i < 4) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "a\nb\nc\n");
            i++;
        }
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        int i2 = 0;
        lineByLineFileInputOperator.setup(this.testMeta.context);
        for (int i3 = 0; i3 < 10; i3++) {
            lineByLineFileInputOperator.beginWindow(i2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            i2++;
        }
        Assert.assertEquals("All tuples read ", 12L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator2.getCurrentPartitions());
        lineByLineFileInputOperator2.setPartitionCount(2);
        Assert.assertEquals(true, Boolean.valueOf(lineByLineFileInputOperator2.processStats((StatsListener.BatchedOperatorStats) null).repartitionRequired));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new DefaultPartition(lineByLineFileInputOperator));
        Collection definePartitions = lineByLineFileInputOperator2.definePartitions(newArrayList, new StatelessPartitionerTest.PartitioningContextImpl(null, 0));
        Assert.assertEquals(2L, definePartitions.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator2.getCurrentPartitions());
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = definePartitions.iterator();
        while (it.hasNext()) {
            newHashMap.put(Integer.valueOf(newHashMap.size()), (Partitioner.Partition) it.next());
        }
        lineByLineFileInputOperator2.partitioned(newHashMap);
        Assert.assertEquals(2L, lineByLineFileInputOperator2.getCurrentPartitions());
        ArrayList<AbstractFileInputOperator> newArrayList2 = Lists.newArrayList();
        Iterator it2 = definePartitions.iterator();
        while (it2.hasNext()) {
            LineByLineFileInputOperator lineByLineFileInputOperator3 = (LineByLineFileInputOperator) ((Partitioner.Partition) it2.next()).getPartitionedInstance();
            lineByLineFileInputOperator3.setup(this.testMeta.context);
            lineByLineFileInputOperator3.output.setSink(collectorTestSink);
            newArrayList2.add(lineByLineFileInputOperator3);
        }
        collectorTestSink.clear();
        for (int i4 = 0; i4 < 10; i4++) {
            for (AbstractFileInputOperator abstractFileInputOperator : newArrayList2) {
                abstractFileInputOperator.beginWindow(i2);
                abstractFileInputOperator.emitTuples();
                abstractFileInputOperator.endWindow();
            }
            i2++;
        }
        Assert.assertEquals("No new tuples read ", 0L, collectorTestSink.collectedTuples.size());
        while (i < 8) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "a\nb\nc\n");
            i++;
        }
        for (int i5 = 0; i5 < 10; i5++) {
            for (AbstractFileInputOperator abstractFileInputOperator2 : newArrayList2) {
                abstractFileInputOperator2.beginWindow(i2);
                abstractFileInputOperator2.emitTuples();
                abstractFileInputOperator2.endWindow();
            }
            i2++;
        }
        Assert.assertEquals("All tuples read ", 12L, collectorTestSink.collectedTuples.size());
    }

    @Test
    public void testPartitioningStateTransferInterrupted() throws Exception {
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
        lineByLineFileInputOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        lineByLineFileInputOperator.setScanIntervalMillis(0);
        lineByLineFileInputOperator.setEmitBatchSize(2);
        LineByLineFileInputOperator lineByLineFileInputOperator2 = (LineByLineFileInputOperator) new Kryo().copy(lineByLineFileInputOperator);
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        for (int i = 0; i < 4; i++) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "a\nb\nc\n");
        }
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        int i2 = 0;
        lineByLineFileInputOperator.setup(this.testMeta.context);
        for (int i3 = 0; i3 < 5; i3++) {
            lineByLineFileInputOperator.beginWindow(i2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            i2++;
        }
        Assert.assertEquals("Partial tuples read ", 6L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator2.getCurrentPartitions());
        lineByLineFileInputOperator2.setPartitionCount(2);
        Assert.assertEquals(true, Boolean.valueOf(lineByLineFileInputOperator2.processStats((StatsListener.BatchedOperatorStats) null).repartitionRequired));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new DefaultPartition(lineByLineFileInputOperator));
        Collection definePartitions = lineByLineFileInputOperator2.definePartitions(newArrayList, new StatelessPartitionerTest.PartitioningContextImpl(null, 0));
        Assert.assertEquals(2L, definePartitions.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator2.getCurrentPartitions());
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = definePartitions.iterator();
        while (it.hasNext()) {
            newHashMap.put(Integer.valueOf(newHashMap.size()), (Partitioner.Partition) it.next());
        }
        lineByLineFileInputOperator2.partitioned(newHashMap);
        Assert.assertEquals(2L, lineByLineFileInputOperator2.getCurrentPartitions());
        ArrayList<AbstractFileInputOperator> newArrayList2 = Lists.newArrayList();
        Iterator it2 = definePartitions.iterator();
        while (it2.hasNext()) {
            LineByLineFileInputOperator lineByLineFileInputOperator3 = (LineByLineFileInputOperator) ((Partitioner.Partition) it2.next()).getPartitionedInstance();
            lineByLineFileInputOperator3.setup(this.testMeta.context);
            lineByLineFileInputOperator3.output.setSink(collectorTestSink);
            newArrayList2.add(lineByLineFileInputOperator3);
        }
        collectorTestSink.clear();
        for (int i4 = 0; i4 < 10; i4++) {
            for (AbstractFileInputOperator abstractFileInputOperator : newArrayList2) {
                abstractFileInputOperator.beginWindow(i2);
                abstractFileInputOperator.emitTuples();
                abstractFileInputOperator.endWindow();
            }
            i2++;
        }
        Assert.assertEquals("Remaining tuples read ", 6L, collectorTestSink.collectedTuples.size());
    }

    @Test
    public void testPartitioningStateTransferFailure() throws Exception {
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
        lineByLineFileInputOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        lineByLineFileInputOperator.setScanIntervalMillis(0);
        lineByLineFileInputOperator.setEmitBatchSize(2);
        LineByLineFileInputOperator lineByLineFileInputOperator2 = (LineByLineFileInputOperator) new Kryo().copy(lineByLineFileInputOperator);
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        for (int i = 0; i < 4; i++) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "a\nb\nc\n");
        }
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        int i2 = 0;
        lineByLineFileInputOperator.setup(this.testMeta.context);
        for (int i3 = 0; i3 < 5; i3++) {
            lineByLineFileInputOperator.beginWindow(i2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            i2++;
        }
        Assert.assertEquals("Partial tuples read ", 6L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator2.getCurrentPartitions());
        lineByLineFileInputOperator2.setPartitionCount(2);
        Assert.assertEquals(true, Boolean.valueOf(lineByLineFileInputOperator2.processStats((StatsListener.BatchedOperatorStats) null).repartitionRequired));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new DefaultPartition(lineByLineFileInputOperator));
        Collection definePartitions = lineByLineFileInputOperator2.definePartitions(newArrayList, new StatelessPartitionerTest.PartitioningContextImpl(null, 0));
        Assert.assertEquals(2L, definePartitions.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator2.getCurrentPartitions());
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = definePartitions.iterator();
        while (it.hasNext()) {
            newHashMap.put(Integer.valueOf(newHashMap.size()), (Partitioner.Partition) it.next());
        }
        lineByLineFileInputOperator2.partitioned(newHashMap);
        Assert.assertEquals(2L, lineByLineFileInputOperator2.getCurrentPartitions());
        ArrayList<AbstractFileInputOperator> newArrayList2 = Lists.newArrayList();
        Iterator it2 = definePartitions.iterator();
        while (it2.hasNext()) {
            LineByLineFileInputOperator lineByLineFileInputOperator3 = (LineByLineFileInputOperator) ((Partitioner.Partition) it2.next()).getPartitionedInstance();
            lineByLineFileInputOperator3.setup(this.testMeta.context);
            lineByLineFileInputOperator3.output.setSink(collectorTestSink);
            newArrayList2.add(lineByLineFileInputOperator3);
        }
        collectorTestSink.clear();
        for (int i4 = 0; i4 < 10; i4++) {
            for (AbstractFileInputOperator abstractFileInputOperator : newArrayList2) {
                abstractFileInputOperator.beginWindow(i2);
                abstractFileInputOperator.emitTuples();
                abstractFileInputOperator.endWindow();
            }
            i2++;
        }
        Assert.assertEquals("Remaining tuples read ", 6L, collectorTestSink.collectedTuples.size());
    }

    @Test
    public void testRecoveryWithFailedFile() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 5; i++) {
            newHashSet.add("f0l" + i);
        }
        newArrayList.addAll(newHashSet);
        File file = new File(this.testMeta.dir, "file0");
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.scanner = null;
        lineByLineFileInputOperator.failedFiles.add(new AbstractFileInputOperator.FailedFile(file.getAbsolutePath(), 1L));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.setup(this.testMeta.context);
        lineByLineFileInputOperator.beginWindow(0L);
        lineByLineFileInputOperator.emitTuples();
        lineByLineFileInputOperator.endWindow();
        lineByLineFileInputOperator.teardown();
        Assert.assertEquals("number tuples", 4L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("lines", newArrayList.subList(1, newArrayList.size()), new ArrayList(collectorTestSink.collectedTuples));
    }

    @Test
    public void testRecoveryWithUnfinishedFile() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 5; i++) {
            newHashSet.add("f0l" + i);
        }
        newArrayList.addAll(newHashSet);
        File file = new File(this.testMeta.dir, "file0");
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.scanner = null;
        lineByLineFileInputOperator.unfinishedFiles.add(new AbstractFileInputOperator.FailedFile(file.getAbsolutePath(), 2L));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.setup(this.testMeta.context);
        lineByLineFileInputOperator.beginWindow(0L);
        lineByLineFileInputOperator.emitTuples();
        lineByLineFileInputOperator.endWindow();
        lineByLineFileInputOperator.teardown();
        Assert.assertEquals("number tuples", 3L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("lines", newArrayList.subList(2, newArrayList.size()), new ArrayList(collectorTestSink.collectedTuples));
    }

    @Test
    public void testRecoveryWithPendingFile() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 5; i++) {
            newHashSet.add("f0l" + i);
        }
        newArrayList.addAll(newHashSet);
        File file = new File(this.testMeta.dir, "file0");
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.scanner = null;
        lineByLineFileInputOperator.pendingFiles.add(file.getAbsolutePath());
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.setup(this.testMeta.context);
        lineByLineFileInputOperator.beginWindow(0L);
        lineByLineFileInputOperator.emitTuples();
        lineByLineFileInputOperator.endWindow();
        lineByLineFileInputOperator.teardown();
        Assert.assertEquals("number tuples", 5L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("lines", newArrayList, new ArrayList(collectorTestSink.collectedTuples));
    }

    @Test
    public void testRecoveryWithCurrentFile() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 5; i++) {
            newHashSet.add("f0l" + i);
        }
        newArrayList.addAll(newHashSet);
        File file = new File(this.testMeta.dir, "file0");
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.scanner = null;
        lineByLineFileInputOperator.currentFile = file.getAbsolutePath();
        lineByLineFileInputOperator.offset = 1L;
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.setup(this.testMeta.context);
        lineByLineFileInputOperator.beginWindow(0L);
        lineByLineFileInputOperator.emitTuples();
        lineByLineFileInputOperator.endWindow();
        lineByLineFileInputOperator.teardown();
        Assert.assertEquals("number tuples", 4L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("lines", newArrayList.subList(1, newArrayList.size()), new ArrayList(collectorTestSink.collectedTuples));
    }

    @Test
    public void testIdempotency() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 2; i++) {
            ArrayList newArrayList2 = Lists.newArrayList();
            for (int i2 = 0; i2 < 2; i2++) {
                newArrayList2.add("f" + i + "l" + i2);
            }
            newArrayList.addAll(newArrayList2);
            FileUtils.write(new File(this.testMeta.dir, "file" + i), StringUtils.join(newArrayList2, '\n'));
        }
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setStatePath(this.testMeta.dir + "/recovery");
        lineByLineFileInputOperator.setWindowDataManager(fSWindowDataManager);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestUtils.setSink(lineByLineFileInputOperator.output, collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*file[\\d]");
        lineByLineFileInputOperator.setup(this.testMeta.context);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            lineByLineFileInputOperator.beginWindow(j2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            j = j2 + 1;
        }
        lineByLineFileInputOperator.teardown();
        ArrayList newArrayList3 = Lists.newArrayList(collectorTestSink.collectedTuples);
        collectorTestSink.clear();
        lineByLineFileInputOperator.setup(this.testMeta.context);
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                Assert.assertEquals("number tuples", 4L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals("lines", newArrayList3, collectorTestSink.collectedTuples);
                lineByLineFileInputOperator.teardown();
                return;
            } else {
                lineByLineFileInputOperator.beginWindow(j4);
                lineByLineFileInputOperator.endWindow();
                j3 = j4 + 1;
            }
        }
    }

    @Test
    public void testIdempotencyWithMultipleEmitTuples() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 2; i++) {
            ArrayList newArrayList2 = Lists.newArrayList();
            for (int i2 = 0; i2 < 2; i2++) {
                newArrayList2.add("f" + i + "l" + i2);
            }
            newArrayList.addAll(newArrayList2);
            FileUtils.write(new File(this.testMeta.dir, "file" + i), StringUtils.join(newArrayList2, '\n'));
        }
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setStatePath(this.testMeta.dir + "/recovery");
        lineByLineFileInputOperator.setWindowDataManager(fSWindowDataManager);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestUtils.setSink(lineByLineFileInputOperator.output, collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*file[\\d]");
        lineByLineFileInputOperator.setup(this.testMeta.context);
        lineByLineFileInputOperator.beginWindow(0L);
        for (int i3 = 0; i3 < 3; i3++) {
            lineByLineFileInputOperator.emitTuples();
        }
        lineByLineFileInputOperator.endWindow();
        lineByLineFileInputOperator.teardown();
        ArrayList newArrayList3 = Lists.newArrayList(collectorTestSink.collectedTuples);
        collectorTestSink.clear();
        lineByLineFileInputOperator.setup(this.testMeta.context);
        lineByLineFileInputOperator.beginWindow(0L);
        lineByLineFileInputOperator.endWindow();
        Assert.assertEquals("number tuples", 4L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("lines", newArrayList3, collectorTestSink.collectedTuples);
        lineByLineFileInputOperator.teardown();
    }

    @Test
    public void testIdempotencyWhenFileContinued() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList.add("l" + i);
        }
        FileUtils.write(new File(this.testMeta.dir, "file0"), StringUtils.join(newArrayList, '\n'));
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setStatePath(this.testMeta.dir + "/recovery");
        lineByLineFileInputOperator.setEmitBatchSize(5);
        lineByLineFileInputOperator.setWindowDataManager(fSWindowDataManager);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*file[\\d]");
        lineByLineFileInputOperator.setup(this.testMeta.context);
        int i2 = 0;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            lineByLineFileInputOperator.beginWindow(j2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            if (j2 > 0) {
                Assert.assertEquals("number tuples", 5L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals("lines", newArrayList.subList(i2, i2 + 5), collectorTestSink.collectedTuples);
                i2 += 5;
            }
            collectorTestSink.clear();
            j = j2 + 1;
        }
        lineByLineFileInputOperator.teardown();
        collectorTestSink.clear();
        int i3 = 0;
        lineByLineFileInputOperator.setup(this.testMeta.context);
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                lineByLineFileInputOperator.teardown();
                return;
            }
            lineByLineFileInputOperator.beginWindow(j4);
            lineByLineFileInputOperator.endWindow();
            if (j4 > 0) {
                Assert.assertEquals("number tuples", 5L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals("lines", newArrayList.subList(i3, i3 + 5), collectorTestSink.collectedTuples);
                i3 += 5;
            }
            collectorTestSink.clear();
            j3 = j4 + 1;
        }
    }

    @Test
    public void testStateWithIdempotency() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 3; i++) {
            HashSet newHashSet2 = Sets.newHashSet();
            for (int i2 = 0; i2 < 2; i2++) {
                newHashSet2.add("f" + i + "l" + i2);
            }
            newHashSet.addAll(newHashSet2);
            FileUtils.write(new File(this.testMeta.dir, "file" + i), StringUtils.join(newHashSet2, '\n'));
        }
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setStatePath(this.testMeta.dir + "/recovery");
        lineByLineFileInputOperator.setWindowDataManager(fSWindowDataManager);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        lineByLineFileInputOperator.output.setSink(collectorTestSink);
        lineByLineFileInputOperator.setDirectory(this.testMeta.dir);
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*file[\\d]");
        lineByLineFileInputOperator.setup(this.testMeta.context);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 4) {
                break;
            }
            lineByLineFileInputOperator.beginWindow(j2);
            lineByLineFileInputOperator.emitTuples();
            lineByLineFileInputOperator.endWindow();
            j = j2 + 1;
        }
        lineByLineFileInputOperator.teardown();
        collectorTestSink.clear();
        lineByLineFileInputOperator.pendingFiles.add(new File(this.testMeta.dir, "file0").getAbsolutePath());
        lineByLineFileInputOperator.failedFiles.add(new AbstractFileInputOperator.FailedFile(new File(this.testMeta.dir, "file1").getAbsolutePath(), 0L));
        lineByLineFileInputOperator.unfinishedFiles.add(new AbstractFileInputOperator.FailedFile(new File(this.testMeta.dir, "file2").getAbsolutePath(), 0L));
        lineByLineFileInputOperator.setup(this.testMeta.context);
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 4) {
                break;
            }
            lineByLineFileInputOperator.beginWindow(j4);
            lineByLineFileInputOperator.endWindow();
            j3 = j4 + 1;
        }
        Assert.assertTrue("pending state", !lineByLineFileInputOperator.pendingFiles.contains("file0"));
        Iterator it = lineByLineFileInputOperator.failedFiles.iterator();
        while (it.hasNext()) {
            Assert.assertTrue("failed state", !((AbstractFileInputOperator.FailedFile) it.next()).path.equals("file1"));
        }
        Iterator it2 = lineByLineFileInputOperator.unfinishedFiles.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue("unfinished state", !((AbstractFileInputOperator.FailedFile) it2.next()).path.equals("file2"));
        }
        lineByLineFileInputOperator.teardown();
    }

    @Test
    public void testIdempotencyWithCheckPoint() throws Exception {
        testIdempotencyWithCheckPoint(new LineByLineFileInputOperator(), new CollectorTestSink(), new IdempotencyTestDriver<LineByLineFileInputOperator>() { // from class: com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest.1
            @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest.IdempotencyTestDriver
            public void writeFile(int i, String str) throws IOException {
                ArrayList newArrayList = Lists.newArrayList();
                for (int i2 = 0; i2 < i; i2++) {
                    newArrayList.add(str + "l" + i2);
                }
                FileUtils.write(new File(AbstractFileInputOperatorTest.this.testMeta.dir, str), StringUtils.join(newArrayList, '\n'));
            }

            /* renamed from: setSink, reason: avoid collision after fix types in other method */
            public void setSink2(LineByLineFileInputOperator lineByLineFileInputOperator, Sink<?> sink) {
                TestUtils.setSink(lineByLineFileInputOperator.output, sink);
            }

            @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest.IdempotencyTestDriver
            public String getDirectory() {
                return AbstractFileInputOperatorTest.this.testMeta.dir;
            }

            @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest.IdempotencyTestDriver
            public Context.OperatorContext getContext() {
                return AbstractFileInputOperatorTest.this.testMeta.context;
            }

            @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest.IdempotencyTestDriver
            public /* bridge */ /* synthetic */ void setSink(LineByLineFileInputOperator lineByLineFileInputOperator, Sink sink) {
                setSink2(lineByLineFileInputOperator, (Sink<?>) sink);
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v63, types: [long, com.datatorrent.lib.io.fs.AbstractFileInputOperator, com.datatorrent.api.Operator] */
    public static <S extends AbstractFileInputOperator, T> void testIdempotencyWithCheckPoint(S s, CollectorTestSink<T> collectorTestSink, IdempotencyTestDriver<S> idempotencyTestDriver) throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(idempotencyTestDriver.getDirectory()).getAbsolutePath()), true);
        idempotencyTestDriver.writeFile(5, "file0");
        idempotencyTestDriver.writeFile(6, "file1");
        idempotencyTestDriver.writeFile(0, "file2");
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setStatePath(idempotencyTestDriver.getDirectory() + "/recovery");
        s.setWindowDataManager(fSWindowDataManager);
        s.setDirectory(idempotencyTestDriver.getDirectory());
        s.getScanner().setFilePatternRegexp(".*file[\\d]");
        s.setup(idempotencyTestDriver.getContext());
        s.setEmitBatchSize(3);
        s.setScanner(new DirectoryScannerNew());
        s.beginWindow(0L);
        s.emitTuples();
        s.endWindow();
        s.beginWindow(1L);
        s.emitTuples();
        s.endWindow();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        AbstractFileInputOperator abstractFileInputOperator = (AbstractFileInputOperator) checkpoint(s, byteArrayOutputStream);
        idempotencyTestDriver.setSink(s, collectorTestSink);
        s.beginWindow(2L);
        s.emitTuples();
        s.endWindow();
        ArrayList newArrayList = Lists.newArrayList(collectorTestSink.collectedTuples);
        s.beginWindow(3L);
        s.emitTuples();
        s.endWindow();
        ArrayList newArrayList2 = Lists.newArrayList(collectorTestSink.collectedTuples);
        s.beginWindow(4L);
        s.emitTuples();
        s.endWindow();
        ArrayList newArrayList3 = Lists.newArrayList(collectorTestSink.collectedTuples);
        s.beginWindow(5L);
        s.emitTuples();
        s.endWindow();
        ArrayList newArrayList4 = Lists.newArrayList(collectorTestSink.collectedTuples);
        s.beginWindow(6L);
        s.emitTuples();
        s.endWindow();
        ArrayList newArrayList5 = Lists.newArrayList(collectorTestSink.collectedTuples);
        s.teardown();
        collectorTestSink.clear();
        ?? r0 = (AbstractFileInputOperator) restoreCheckPoint(abstractFileInputOperator, byteArrayOutputStream);
        idempotencyTestDriver.getContext().getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
        r0.setup(idempotencyTestDriver.getContext());
        idempotencyTestDriver.setSink(r0, collectorTestSink);
        long longValue = ((Long) idempotencyTestDriver.getContext().getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue() + 1;
        r0.beginWindow(longValue);
        Assert.assertTrue(((AbstractFileInputOperator) r0).currentFile == null);
        r0.emitTuples();
        r0.endWindow();
        Assert.assertEquals("lines", newArrayList, collectorTestSink.collectedTuples);
        r0.beginWindow(longValue + 1);
        r0.emitTuples();
        r0.endWindow();
        Assert.assertEquals("lines", newArrayList2, collectorTestSink.collectedTuples);
        r0.beginWindow(r0 + 1);
        r0.emitTuples();
        r0.endWindow();
        Assert.assertEquals("lines", newArrayList3, collectorTestSink.collectedTuples);
        r0.beginWindow(r0 + 1);
        Assert.assertTrue(((AbstractFileInputOperator) r0).currentFile == null);
        r0.emitTuples();
        r0.endWindow();
        Assert.assertEquals("lines", newArrayList4, collectorTestSink.collectedTuples);
        r0.beginWindow(r0 + 1);
        Assert.assertTrue(((AbstractFileInputOperator) r0).currentFile == null);
        r0.emitTuples();
        r0.endWindow();
        Assert.assertEquals("lines", newArrayList5, collectorTestSink.collectedTuples);
        Assert.assertEquals("number tuples", 8L, collectorTestSink.collectedTuples.size());
        r0.teardown();
    }

    public static <T> T checkpoint(T t, ByteArrayOutputStream byteArrayOutputStream) throws Exception {
        Kryo kryo = new Kryo();
        Output output = new Output(byteArrayOutputStream);
        kryo.writeObject(output, t);
        output.close();
        Input input = new Input(byteArrayOutputStream.toByteArray());
        T t2 = (T) kryo.readObject(input, t.getClass());
        input.close();
        return t2;
    }

    public static <T> T restoreCheckPoint(T t, ByteArrayOutputStream byteArrayOutputStream) throws Exception {
        Kryo kryo = new Kryo();
        Input input = new Input(byteArrayOutputStream.toByteArray());
        T t2 = (T) kryo.readObject(input, t.getClass());
        input.close();
        return t2;
    }

    @Test
    public void testWindowDataManagerPartitioning() throws Exception {
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
        lineByLineFileInputOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        lineByLineFileInputOperator.setWindowDataManager(new FSWindowDataManager());
        lineByLineFileInputOperator.operatorId = 7;
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        for (int i = 0; i < 4; i++) {
            FileUtils.write(new File(this.testMeta.dir, "partition00" + i), "");
        }
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new DefaultPartition(lineByLineFileInputOperator));
        Collection definePartitions = lineByLineFileInputOperator.definePartitions(newArrayList, new StatelessPartitionerTest.PartitioningContextImpl(null, 2));
        Assert.assertEquals(2L, definePartitions.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator.getCurrentPartitions());
        LinkedList<FSWindowDataManager> newLinkedList = Lists.newLinkedList();
        Iterator it = definePartitions.iterator();
        while (it.hasNext()) {
            newLinkedList.add(((AbstractFileInputOperator) ((Partitioner.Partition) it.next()).getPartitionedInstance()).getWindowDataManager());
        }
        Assert.assertEquals("count of storage managers", 2L, newLinkedList.size());
        int i2 = 0;
        FSWindowDataManager fSWindowDataManager = null;
        for (FSWindowDataManager fSWindowDataManager2 : newLinkedList) {
            if (fSWindowDataManager2.getDeletedOperators() != null) {
                i2++;
                fSWindowDataManager = fSWindowDataManager2;
            }
        }
        Assert.assertEquals("count of delete managers", 1L, i2);
        Assert.assertNotNull("deleted operators manager", fSWindowDataManager);
        Assert.assertEquals("deleted operators", Sets.newHashSet(new Integer[]{7}), fSWindowDataManager.getDeletedOperators());
    }

    @Test
    public void testWithCustomScanner() throws Exception {
        LineByLineFileInputOperator lineByLineFileInputOperator = new LineByLineFileInputOperator();
        lineByLineFileInputOperator.setScanner(new MyScanner());
        lineByLineFileInputOperator.getScanner().setFilePatternRegexp(".*partition_([\\d]*)");
        lineByLineFileInputOperator.setDirectory(new File(this.testMeta.dir).getAbsolutePath());
        Random random = new Random();
        Path path = new Path(new File(this.testMeta.dir).getAbsolutePath());
        FileContext.getLocalFSFileContext().delete(path, true);
        for (int i = 0; i < 10; i++) {
            FileUtils.write(new File(this.testMeta.dir, i + "_partition_00" + random.nextInt(100)), "");
        }
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new DefaultPartition(lineByLineFileInputOperator));
        Collection<Partitioner.Partition> definePartitions = lineByLineFileInputOperator.definePartitions(newArrayList, new StatelessPartitionerTest.PartitioningContextImpl(null, 2));
        Assert.assertEquals(2L, definePartitions.size());
        Assert.assertEquals(1L, lineByLineFileInputOperator.getCurrentPartitions());
        for (Partitioner.Partition partition : definePartitions) {
            Assert.assertNotSame(lineByLineFileInputOperator, partition.getPartitionedInstance());
            Assert.assertNotSame(lineByLineFileInputOperator.getScanner(), ((AbstractFileInputOperator) partition.getPartitionedInstance()).getScanner());
            Assert.assertEquals("partition " + ((AbstractFileInputOperator) partition.getPartitionedInstance()).getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, Sets.newHashSet()), 6L, r0.size());
        }
    }

    @Test
    public void testCustomScanner() {
        MyScanner myScanner = new MyScanner();
        myScanner.setPartitionCount(2);
        myScanner.setPartitionIndex(1);
        Assert.assertTrue("File should be accepted by this partition ", myScanner.acceptFile("1_file"));
        myScanner.setPartitionIndex(0);
        Assert.assertFalse("File should not be accepted by this partition ", myScanner.acceptFile("1_file"));
    }
}
