package org.apache.storm.hdfs.spout;

import clojure.lang.Atom;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hdfs.common.HdfsUtils;
import org.apache.storm.hdfs.spout.HdfsSpout;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/storm/hdfs/spout/TestHdfsSpout.class */
public class TestHdfsSpout {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    public File baseFolder;
    private Path source;
    private Path archive;
    private Path badfiles;
    static MiniDFSCluster.Builder builder;
    static MiniDFSCluster hdfsCluster;
    static DistributedFileSystem fs;
    static String hdfsURI;
    static Configuration conf = new Configuration();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/hdfs/spout/TestHdfsSpout$MockCollector.class */
    public static class MockCollector extends SpoutOutputCollector {
        public ArrayList<String> lines;
        public ArrayList<HdfsUtils.Pair<HdfsSpout.MessageId, List<Object>>> items;

        public MockCollector() {
            super((ISpoutOutputCollector) null);
            this.lines = new ArrayList<>();
            this.items = new ArrayList<>();
        }

        public List<Integer> emit(String str, List<Object> list, Object obj) {
            this.lines.add(list.toString());
            this.items.add(HdfsUtils.Pair.of(obj, list));
            return null;
        }

        public void emitDirect(int i, String str, List<Object> list, Object obj) {
            throw new UnsupportedOperationException("NOT Implemented");
        }

        public void reportError(Throwable th) {
            throw new UnsupportedOperationException("NOT Implemented");
        }

        public long getPendingCount() {
            return 0L;
        }
    }

    /* loaded from: input_file:org/apache/storm/hdfs/spout/TestHdfsSpout$MockTextFailingReader.class */
    static class MockTextFailingReader extends TextFileReader {
        public static final String[] defaultFields = {"line"};
        int readAttempts;

        public MockTextFailingReader(FileSystem fileSystem, Path path, Map map) throws IOException {
            super(fileSystem, path, map);
            this.readAttempts = 0;
        }

        public List<Object> next() throws IOException, ParseException {
            this.readAttempts++;
            if (this.readAttempts == 3 || this.readAttempts == 4) {
                throw new IOException("mock test exception");
            }
            if (this.readAttempts > 5) {
                throw new ParseException("mock test exception", (Throwable) null);
            }
            return super.next();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/hdfs/spout/TestHdfsSpout$MockTopologyContext.class */
    public static class MockTopologyContext extends TopologyContext {
        private final int componentId;

        public MockTopologyContext(int i) {
            super((StormTopology) null, (Map) null, (Map) null, (Map) null, (Map) null, (String) null, (String) null, (String) null, (Integer) null, (Integer) null, (List) null, (Map) null, (Map) null, (Map) null, (Map) null, (Atom) null);
            this.componentId = i;
        }

        public String getThisComponentId() {
            return Integer.toString(this.componentId);
        }
    }

    @BeforeClass
    public static void setupClass() throws IOException {
        builder = new MiniDFSCluster.Builder(new Configuration());
        hdfsCluster = builder.build();
        fs = hdfsCluster.getFileSystem();
        hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
    }

    @AfterClass
    public static void teardownClass() throws IOException {
        fs.close();
        hdfsCluster.shutdown();
    }

    @Before
    public void setup() throws Exception {
        this.baseFolder = this.tempFolder.newFolder("hdfsspout");
        this.source = new Path(this.baseFolder.toString() + "/source");
        fs.mkdirs(this.source);
        this.archive = new Path(this.baseFolder.toString() + "/archive");
        fs.mkdirs(this.archive);
        this.badfiles = new Path(this.baseFolder.toString() + "/bad");
        fs.mkdirs(this.badfiles);
    }

    @After
    public void shutDown() throws IOException {
        fs.delete(new Path(this.baseFolder.toString()), true);
    }

    @Test
    public void testSimpleText_noACK() throws IOException {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 5);
        createTextFile(new Path(this.source.toString() + "/file2.txt"), 5);
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(1);
        makeSpout.setCommitFrequencySec(1);
        openSpout(makeSpout, 0, getCommonConfigs());
        runSpout(makeSpout, "r11");
        checkCollectorOutput_txt((MockCollector) makeSpout.getCollector(), new Path(this.archive.toString() + "/file1.txt"), new Path(this.archive.toString() + "/file2.txt"));
    }

    @Test
    public void testSimpleText_ACK() throws IOException {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 5);
        createTextFile(new Path(this.source.toString() + "/file2.txt"), 5);
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(1);
        makeSpout.setCommitFrequencySec(1);
        Map commonConfigs = getCommonConfigs();
        commonConfigs.put("topology.acker.executors", "1");
        openSpout(makeSpout, 0, commonConfigs);
        runSpout(makeSpout, "r6", "a0", "a1", "a2", "a3", "a4");
        Path path = new Path(this.archive.toString() + "/file1.txt");
        checkCollectorOutput_txt((MockCollector) makeSpout.getCollector(), path);
        runSpout(makeSpout, "r6", "a5", "a6", "a7", "a8", "a9");
        checkCollectorOutput_txt((MockCollector) makeSpout.getCollector(), path, new Path(this.archive.toString() + "/file2.txt"));
    }

    @Test
    public void testResumeAbandoned_Text_NoAck() throws Exception {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 6);
        Integer num = 1;
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(1);
        makeSpout.setCommitFrequencySec(1000);
        makeSpout.setLockTimeoutSec(num.intValue());
        HdfsSpout makeSpout2 = makeSpout("text", TextFileReader.defaultFields);
        makeSpout2.setCommitFrequencyCount(1);
        makeSpout2.setCommitFrequencySec(1000);
        makeSpout2.setLockTimeoutSec(num.intValue());
        Map commonConfigs = getCommonConfigs();
        openSpout(makeSpout, 0, commonConfigs);
        openSpout(makeSpout2, 1, commonConfigs);
        Assert.assertEquals(2L, runSpout(makeSpout, "r2").size());
        FileLock fileLock = (FileLock) getField(makeSpout, "lock");
        TestFileLock.closeUnderlyingLockFile(fileLock);
        Thread.sleep(num.intValue() * 2 * 1000);
        Assert.assertTrue(fs.exists(fileLock.getLockFile()));
        Assert.assertEquals(3L, runSpout(makeSpout2, "r3").size());
        Assert.assertTrue(fs.exists(fileLock.getLockFile()));
        Assert.assertFalse(readTextFile(fs, fileLock.getLockFile().toString()).isEmpty());
        Assert.assertEquals(4L, runSpout(makeSpout2, "r2").size());
        Assert.assertFalse(fs.exists(fileLock.getLockFile()));
        Assert.assertNull((FileReader) getField(makeSpout2, "reader"));
        Assert.assertTrue(getBoolField(makeSpout2, "fileReadCompletely"));
    }

    @Test
    public void testResumeAbandoned_Seq_NoAck() throws Exception {
        createSeqFile(fs, new Path(this.source.toString() + "/file1.seq"), 6);
        Integer num = 1;
        HdfsSpout makeSpout = makeSpout("seq", SequenceFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(1);
        makeSpout.setCommitFrequencySec(1000);
        makeSpout.setLockTimeoutSec(num.intValue());
        HdfsSpout makeSpout2 = makeSpout("seq", SequenceFileReader.defaultFields);
        makeSpout2.setCommitFrequencyCount(1);
        makeSpout2.setCommitFrequencySec(1000);
        makeSpout2.setLockTimeoutSec(num.intValue());
        Map commonConfigs = getCommonConfigs();
        openSpout(makeSpout, 0, commonConfigs);
        openSpout(makeSpout2, 1, commonConfigs);
        Assert.assertEquals(2L, runSpout(makeSpout, "r2").size());
        FileLock fileLock = (FileLock) getField(makeSpout, "lock");
        TestFileLock.closeUnderlyingLockFile(fileLock);
        Thread.sleep(num.intValue() * 2 * 1000);
        Assert.assertTrue(fs.exists(fileLock.getLockFile()));
        Assert.assertEquals(3L, runSpout(makeSpout2, "r3").size());
        Assert.assertTrue(fs.exists(fileLock.getLockFile()));
        Assert.assertFalse(getTextFileContents(fs, fileLock.getLockFile()).isEmpty());
        Assert.assertEquals(4L, runSpout(makeSpout2, "r3").size());
        Assert.assertFalse(fs.exists(fileLock.getLockFile()));
        Assert.assertNull((FileReader) getField(makeSpout2, "reader"));
        Assert.assertTrue(getBoolField(makeSpout2, "fileReadCompletely"));
    }

    private void checkCollectorOutput_txt(MockCollector mockCollector, Path... pathArr) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Path path : pathArr) {
            arrayList.addAll(getTextFileContents(fs, path));
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator<HdfsUtils.Pair<HdfsSpout.MessageId, List<Object>>> it = mockCollector.items.iterator();
        while (it.hasNext()) {
            arrayList2.add(((List) it.next().getValue()).get(0).toString());
        }
        Assert.assertEquals(arrayList, arrayList2);
    }

    private List<String> getTextFileContents(FileSystem fileSystem, Path path) throws IOException {
        ArrayList arrayList = new ArrayList();
        InputStreamReader inputStreamReader = new InputStreamReader((InputStream) fileSystem.open(path), "UTF-8");
        BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
        String readLine = bufferedReader.readLine();
        while (true) {
            String str = readLine;
            if (str == null) {
                inputStreamReader.close();
                return arrayList;
            }
            arrayList.add(str);
            readLine = bufferedReader.readLine();
        }
    }

    private void checkCollectorOutput_seq(MockCollector mockCollector, Path... pathArr) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Path path : pathArr) {
            arrayList.addAll(getSeqFileContents(fs, path));
        }
        Assert.assertTrue(arrayList.equals(mockCollector.lines));
    }

    private List<String> getSeqFileContents(FileSystem fileSystem, Path... pathArr) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Path path : pathArr) {
            SequenceFile.Reader reader = new SequenceFile.Reader(conf, new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(new Path(fileSystem.getUri().toString() + path.toString()))});
            try {
                Writable writable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                Writable writable2 = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
                while (reader.next(writable, writable2)) {
                    arrayList.add(Arrays.asList(writable, writable2).toString());
                }
            } finally {
                reader.close();
            }
        }
        return arrayList;
    }

    private List<String> listDir(Path path) throws IOException {
        ArrayList arrayList = new ArrayList();
        RemoteIterator listFiles = fs.listFiles(path, false);
        while (listFiles.hasNext()) {
            arrayList.add(Path.getPathWithoutSchemeAndAuthority(((LocatedFileStatus) listFiles.next()).getPath()).toString());
        }
        return arrayList;
    }

    @Test
    public void testMultipleFileConsumption_Ack() throws Exception {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 5);
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(1);
        makeSpout.setCommitFrequencySec(1);
        Map commonConfigs = getCommonConfigs();
        commonConfigs.put("topology.acker.executors", "1");
        openSpout(makeSpout, 0, commonConfigs);
        runSpout(makeSpout, "r3");
        Assert.assertNotNull((FileReader) getField(makeSpout, "reader"));
        Assert.assertEquals(false, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
        runSpout(makeSpout, "r3");
        Assert.assertNotNull((FileReader) getField(makeSpout, "reader"));
        Assert.assertEquals(true, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
        runSpout(makeSpout, "a0", "a1", "a2");
        Assert.assertNotNull((FileReader) getField(makeSpout, "reader"));
        Assert.assertEquals(true, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
        runSpout(makeSpout, "a3", "a4");
        Assert.assertNull((FileReader) getField(makeSpout, "reader"));
        Assert.assertEquals(true, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
        createTextFile(new Path(this.source.toString() + "/file2.txt"), 5);
        runSpout(makeSpout, "r1");
        Assert.assertNotNull(getField(makeSpout, "reader"));
        Assert.assertEquals(false, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
        runSpout(makeSpout, "a5");
        Assert.assertNotNull(getField(makeSpout, "reader"));
        Assert.assertEquals(false, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
        runSpout(makeSpout, "r5", "a6", "a7", "a8", "a9");
        Assert.assertNull(getField(makeSpout, "reader"));
        Assert.assertEquals(true, Boolean.valueOf(getBoolField(makeSpout, "fileReadCompletely")));
    }

    private static <T> T getField(HdfsSpout hdfsSpout, String str) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = HdfsSpout.class.getDeclaredField(str);
        declaredField.setAccessible(true);
        return (T) declaredField.get(hdfsSpout);
    }

    private static boolean getBoolField(HdfsSpout hdfsSpout, String str) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = HdfsSpout.class.getDeclaredField(str);
        declaredField.setAccessible(true);
        return declaredField.getBoolean(hdfsSpout);
    }

    @Test
    public void testSimpleSequenceFile() throws IOException {
        this.source = new Path("/tmp/hdfsspout/source");
        fs.mkdirs(this.source);
        this.archive = new Path("/tmp/hdfsspout/archive");
        fs.mkdirs(this.archive);
        createSeqFile(fs, new Path(this.source + "/file1.seq"), 5);
        createSeqFile(fs, new Path(this.source + "/file2.seq"), 5);
        HdfsSpout makeSpout = makeSpout("seq", SequenceFileReader.defaultFields);
        openSpout(makeSpout, 0, getCommonConfigs());
        Assert.assertEquals(10L, runSpout(makeSpout, "r11").size());
        Assert.assertEquals(2L, listDir(this.archive).size());
        checkCollectorOutput_seq((MockCollector) makeSpout.getCollector(), new Path(this.archive + "/file1.seq"), new Path(this.archive + "/file2.seq"));
    }

    @Test
    public void testReadFailures() throws Exception {
        Path path = new Path(this.source.toString() + "/file1.txt");
        Path path2 = new Path(this.source.toString() + "/file2.txt");
        createTextFile(path, 6);
        createTextFile(path2, 7);
        Assert.assertEquals(2L, listDir(this.source).size());
        HdfsSpout makeSpout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields);
        openSpout(makeSpout, 0, getCommonConfigs());
        Assert.assertArrayEquals(new String[]{"[line 0]", "[line 1]", "[line 2]", "[line 0]", "[line 1]", "[line 2]"}, runSpout(makeSpout, "r11").toArray());
        Assert.assertEquals(((MockCollector) makeSpout.getCollector()).lines.size(), 6L);
        Assert.assertEquals(HdfsUtils.listFilesByModificationTime(fs, this.badfiles, 0L).size(), 2L);
    }

    @Test
    public void testLocking() throws Exception {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 10);
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(1);
        makeSpout.setCommitFrequencySec(1000);
        openSpout(makeSpout, 0, getCommonConfigs());
        List<String> runSpout = runSpout(makeSpout, "r5");
        Assert.assertEquals(5L, runSpout.size());
        List<String> listDir = listDir(makeSpout.getLockDirPath());
        Assert.assertEquals(1L, listDir.size());
        Assert.assertEquals(readTextFile(fs, listDir.get(0)).size(), runSpout.size() + 1);
        runSpout(makeSpout, "r6");
        Assert.assertEquals(0L, listDir(makeSpout.getLockDirPath()).size());
        createTextFile(new Path(this.source.toString() + "/file2.txt"), 10);
        Assert.assertEquals(15L, runSpout(makeSpout, "r5").size());
        List<String> listDir2 = listDir(makeSpout.getLockDirPath());
        Assert.assertEquals(1L, listDir2.size());
        Assert.assertEquals(6L, readTextFile(fs, listDir2.get(0)).size());
        runSpout(makeSpout, "r6");
        Assert.assertEquals(0L, listDir(makeSpout.getLockDirPath()).size());
    }

    @Test
    public void testLockLoggingFreqCount() throws Exception {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 10);
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(2);
        makeSpout.setCommitFrequencySec(1000);
        openSpout(makeSpout, 0, getCommonConfigs());
        runSpout(makeSpout, "r5");
        String str = listDir(makeSpout.getLockDirPath()).get(0);
        Assert.assertEquals(readTextFile(fs, str).size(), 3L);
        runSpout(makeSpout, "r1");
        Assert.assertEquals(readTextFile(fs, str).size(), 4L);
    }

    @Test
    public void testLockLoggingFreqSec() throws Exception {
        createTextFile(new Path(this.source.toString() + "/file1.txt"), 10);
        HdfsSpout makeSpout = makeSpout("text", TextFileReader.defaultFields);
        makeSpout.setCommitFrequencyCount(0);
        makeSpout.setCommitFrequencySec(2);
        openSpout(makeSpout, 0, getCommonConfigs());
        runSpout(makeSpout, "r5");
        String str = listDir(makeSpout.getLockDirPath()).get(0);
        Assert.assertEquals(readTextFile(fs, str).size(), 1L);
        Thread.sleep(3000L);
        runSpout(makeSpout, "r1");
        Assert.assertEquals(2L, readTextFile(fs, str).size());
    }

    private static List<String> readTextFile(FileSystem fileSystem, String str) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(str))));
        ArrayList arrayList = new ArrayList();
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return arrayList;
            }
            arrayList.add(readLine);
        }
    }

    private Map getCommonConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("topology.acker.executors", "0");
        return hashMap;
    }

    private HdfsSpout makeSpout(String str, String[] strArr) {
        return new HdfsSpout().withOutputFields(strArr).setReaderType(str).setHdfsUri(hdfsCluster.getURI().toString()).setSourceDir(this.source.toString()).setArchiveDir(this.archive.toString()).setBadFilesDir(this.badfiles.toString());
    }

    private void openSpout(HdfsSpout hdfsSpout, int i, Map map) {
        hdfsSpout.open(map, new MockTopologyContext(i), new MockCollector());
    }

    private List<String> runSpout(HdfsSpout hdfsSpout, String... strArr) {
        MockCollector mockCollector = (MockCollector) hdfsSpout.getCollector();
        for (String str : strArr) {
            if (str.startsWith("r")) {
                int parseInt = str.length() > 1 ? Integer.parseInt(str.substring(1)) : 1;
                for (int i = 0; i < parseInt; i++) {
                    hdfsSpout.nextTuple();
                }
            } else if (str.startsWith("a")) {
                hdfsSpout.ack(mockCollector.items.get(Integer.parseInt(str.substring(1))).getKey());
            } else if (str.startsWith("f")) {
                hdfsSpout.fail(mockCollector.items.get(Integer.parseInt(str.substring(1))).getKey());
            }
        }
        return mockCollector.lines;
    }

    private void createTextFile(Path path, int i) throws IOException {
        FSDataOutputStream create = fs.create(path);
        int i2 = 0;
        for (int i3 = 0; i3 < i; i3++) {
            create.writeBytes("line " + i3 + System.lineSeparator());
            i2 += ("line " + i3 + System.lineSeparator()).getBytes().length;
        }
        create.close();
    }

    private static void createSeqFile(FileSystem fileSystem, Path path, int i) throws IOException {
        Configuration configuration = new Configuration();
        try {
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, false);
            }
            SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, configuration, path, IntWritable.class, Text.class);
            for (int i2 = 0; i2 < i; i2++) {
                createWriter.append(new IntWritable(i2), new Text("line " + i2));
            }
            createWriter.close();
            System.out.println("done");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
