package org.apache.storm.hdfs.spout;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.HdfsUtils;
import org.apache.storm.hdfs.security.HdfsSecurityUtil;
import org.apache.storm.hdfs.spout.FileLock;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hdfs/spout/HdfsSpout.class */
public class HdfsSpout extends BaseRichSpout {
    private String hdfsUri;
    private String readerType;
    private Fields outputFields;
    private String sourceDir;
    private Path sourceDirPath;
    private String archiveDir;
    private Path archiveDirPath;
    private String badFilesDir;
    private Path badFilesDirPath;
    private String lockDir;
    private Path lockDirPath;
    private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
    private FileSystem hdfs;
    private FileReader reader;
    private SpoutOutputCollector collector;
    private Configuration hdfsConfig;
    private FileLock lock;
    private Timer commitTimer;
    private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
    private int commitFrequencySec = 10;
    private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
    private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
    private boolean clocksInSync = true;
    private String inprogress_suffix = ".inprogress";
    private String ignoreSuffix = ".ignore";
    private String outputStreamName = null;
    private ProgressTracker tracker = null;
    HashMap<MessageId, List<Object>> inflight = new HashMap<>();
    LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>();
    private Map conf = null;
    private String spoutId = null;
    HdfsUtils.Pair<Path, FileLock.LogEntry> lastExpiredLock = null;
    private long lastExpiredLockTime = 0;
    private long tupleCounter = 0;
    private boolean ackEnabled = false;
    private int acksSinceLastCommit = 0;
    private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
    private boolean fileReadCompletely = true;
    private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/hdfs/spout/HdfsSpout$MessageId.class */
    public static class MessageId implements Comparable<MessageId> {
        public long msgNumber;
        public String fullPath;
        public FileOffset offset;

        public MessageId(long j, Path path, FileOffset fileOffset) {
            this.msgNumber = j;
            this.fullPath = path.toString();
            this.offset = fileOffset;
        }

        public String toString() {
            return "{'" + this.fullPath + "':" + this.offset + "}";
        }

        @Override // java.lang.Comparable
        public int compareTo(MessageId messageId) {
            if (this.msgNumber < messageId.msgNumber) {
                return -1;
            }
            return this.msgNumber > messageId.msgNumber ? 1 : 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/hdfs/spout/HdfsSpout$RenameException.class */
    public static class RenameException extends IOException {
        public final Path oldFile;
        public final Path newFile;

        public RenameException(Path path, Path path2) {
            super("Rename of " + path + " to " + path2 + " failed");
            this.oldFile = path;
            this.newFile = path2;
        }

        public RenameException(Path path, Path path2, IOException iOException) {
            super("Rename of " + path + " to " + path2 + " failed", iOException);
            this.oldFile = path;
            this.newFile = path2;
        }
    }

    public HdfsSpout setHdfsUri(String str) {
        this.hdfsUri = str;
        return this;
    }

    public HdfsSpout setReaderType(String str) {
        this.readerType = str;
        return this;
    }

    public HdfsSpout setSourceDir(String str) {
        this.sourceDir = str;
        return this;
    }

    public HdfsSpout setArchiveDir(String str) {
        this.archiveDir = str;
        return this;
    }

    public HdfsSpout setBadFilesDir(String str) {
        this.badFilesDir = str;
        return this;
    }

    public HdfsSpout setLockDir(String str) {
        this.lockDir = str;
        return this;
    }

    public HdfsSpout setCommitFrequencyCount(int i) {
        this.commitFrequencyCount = i;
        return this;
    }

    public HdfsSpout setCommitFrequencySec(int i) {
        this.commitFrequencySec = i;
        return this;
    }

    public HdfsSpout setMaxOutstanding(int i) {
        this.maxOutstanding = i;
        return this;
    }

    public HdfsSpout setLockTimeoutSec(int i) {
        this.lockTimeoutSec = i;
        return this;
    }

    public HdfsSpout setClocksInSync(boolean z) {
        this.clocksInSync = z;
        return this;
    }

    public HdfsSpout setIgnoreSuffix(String str) {
        this.ignoreSuffix = str;
        return this;
    }

    public HdfsSpout withOutputFields(String... strArr) {
        this.outputFields = new Fields(strArr);
        return this;
    }

    public HdfsSpout withConfigKey(String str) {
        this.configKey = str;
        return this;
    }

    public HdfsSpout withOutputStream(String str) {
        this.outputStreamName = str;
        return this;
    }

    public Path getLockDirPath() {
        return this.lockDirPath;
    }

    public SpoutOutputCollector getCollector() {
        return this.collector;
    }

    public void nextTuple() {
        boolean z;
        LOG.trace("Next Tuple {}", this.spoutId);
        if (!this.retryList.isEmpty()) {
            LOG.debug("Sending tuple from retry list");
            HdfsUtils.Pair<MessageId, List<Object>> remove = this.retryList.remove();
            emitData(remove.getValue(), remove.getKey());
            return;
        }
        if (this.ackEnabled && this.tracker.size() >= this.maxOutstanding) {
            LOG.warn("Waiting for more ACKs before generating new tuples. Progress tracker size has reached limit {}, SpoutID {}", Integer.valueOf(this.maxOutstanding), this.spoutId);
            return;
        }
        while (true) {
            try {
                z = false;
                if (this.reader == null) {
                    this.reader = pickNextFile();
                    if (this.reader == null) {
                        LOG.debug("Currently no new files to process under : " + this.sourceDirPath);
                        return;
                    } else {
                        this.fileReadCompletely = false;
                        z = true;
                    }
                }
            } catch (IOException e) {
                LOG.error("I/O Error processing at file location " + getFileProgress(this.reader), e);
                return;
            } catch (ParseException e2) {
                LOG.error("Parsing error when processing at file location " + getFileProgress(this.reader) + ". Skipping remainder of file.", e2);
                markFileAsBad(this.reader.getFilePath());
            }
            if (this.fileReadCompletely) {
                return;
            }
            List<Object> next = this.reader.next();
            if (next != null) {
                this.fileReadCompletely = false;
                this.tupleCounter++;
                emitData(next, new MessageId(this.tupleCounter, this.reader.getFilePath(), this.reader.getFileOffset()));
                if (this.ackEnabled) {
                    commitProgress(this.tracker.getCommitPosition());
                    return;
                } else {
                    this.acksSinceLastCommit++;
                    commitProgress(this.reader.getFileOffset());
                    return;
                }
            }
            this.fileReadCompletely = true;
            if (!this.ackEnabled || z) {
                markFileAsDone(this.reader.getFilePath());
            }
        }
    }

    private void commitProgress(FileOffset fileOffset) {
        if (fileOffset == null || this.lock == null || !canCommitNow()) {
            return;
        }
        try {
            String obj = fileOffset.toString();
            this.lock.heartbeat(obj);
            LOG.debug("{} Committed progress. {}", this.spoutId, obj);
            this.acksSinceLastCommit = 0;
            this.commitTimeElapsed.set(false);
            setupCommitElapseTimer();
        } catch (IOException e) {
            LOG.error("Unable to commit progress Will retry later. Spout ID = " + this.spoutId, e);
        }
    }

    private void setupCommitElapseTimer() {
        if (this.commitFrequencySec <= 0) {
            return;
        }
        this.commitTimer.schedule(new TimerTask() { // from class: org.apache.storm.hdfs.spout.HdfsSpout.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                HdfsSpout.this.commitTimeElapsed.set(true);
            }
        }, this.commitFrequencySec * 1000);
    }

    private static String getFileProgress(FileReader fileReader) {
        return fileReader.getFilePath() + " " + fileReader.getFileOffset();
    }

    private void markFileAsDone(Path path) {
        try {
            LOG.info("Completed processing {}. Spout Id = {}", renameCompletedFile(this.reader.getFilePath()), this.spoutId);
        } catch (IOException e) {
            LOG.error("Unable to archive completed file" + path + " Spout ID " + this.spoutId, e);
        }
        closeReaderAndResetTrackers();
    }

    private void markFileAsBad(Path path) {
        String path2 = path.toString();
        String name = new Path(path2.substring(0, path2.indexOf(this.inprogress_suffix))).getName();
        Path path3 = new Path(this.badFilesDirPath + "/" + name);
        LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", new Object[]{name, path3, this.tracker.getCommitPosition(), this.spoutId});
        try {
        } catch (IOException e) {
            LOG.warn("Error moving bad file: " + path + " to destination " + path3 + " SpoutId =" + this.spoutId, e);
        }
        if (!this.hdfs.rename(path, path3)) {
            throw new IOException("Move failed for bad file: " + path);
        }
        closeReaderAndResetTrackers();
    }

    private void closeReaderAndResetTrackers() {
        this.inflight.clear();
        this.tracker.offsets.clear();
        this.retryList.clear();
        this.reader.close();
        this.reader = null;
        releaseLockAndLog(this.lock, this.spoutId);
        this.lock = null;
    }

    private static void releaseLockAndLog(FileLock fileLock, String str) {
        if (fileLock != null) {
            try {
                fileLock.release();
                LOG.debug("Spout {} released FileLock. SpoutId = {}", fileLock.getLockFile(), str);
            } catch (IOException e) {
                LOG.error("Unable to delete lock file : " + fileLock.getLockFile() + " SpoutId =" + str, e);
            }
        }
    }

    protected void emitData(List<Object> list, MessageId messageId) {
        LOG.trace("Emitting - {}", messageId);
        if (this.outputStreamName == null) {
            this.collector.emit(list, messageId);
        } else {
            this.collector.emit(this.outputStreamName, list, messageId);
        }
        this.inflight.put(messageId, list);
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        Map map2;
        LOG.info("Opening HDFS Spout");
        this.conf = map;
        this.commitTimer = new Timer();
        this.tracker = new ProgressTracker();
        this.hdfsConfig = new Configuration();
        this.collector = spoutOutputCollector;
        this.hdfsConfig = new Configuration();
        this.tupleCounter = 0L;
        if (this.hdfsUri == null && map.containsKey(Configs.HDFS_URI)) {
            this.hdfsUri = map.get(Configs.HDFS_URI).toString();
        }
        if (this.hdfsUri == null) {
            throw new RuntimeException("HDFS Uri not set on spout");
        }
        try {
            this.hdfs = FileSystem.get(URI.create(this.hdfsUri), this.hdfsConfig);
            if (map.containsKey(this.configKey) && (map2 = (Map) map.get(this.configKey)) != null) {
                for (String str : map2.keySet()) {
                    LOG.info("HDFS Config override : {} = {} ", str, String.valueOf(map2.get(str)));
                    this.hdfsConfig.set(str, String.valueOf(map2.get(str)));
                }
                try {
                    HdfsSecurityUtil.login(map, this.hdfsConfig);
                } catch (IOException e) {
                    LOG.error("HDFS Login failed ", e);
                    throw new RuntimeException(e);
                }
            }
            if (this.readerType == null && map.containsKey(Configs.READER_TYPE)) {
                this.readerType = map.get(Configs.READER_TYPE).toString();
            }
            checkValidReader(this.readerType);
            if (this.sourceDir == null && map.containsKey(Configs.SOURCE_DIR)) {
                this.sourceDir = map.get(Configs.SOURCE_DIR).toString();
            }
            if (this.sourceDir == null) {
                LOG.error("hdfsspout.source.dir setting is required");
                throw new RuntimeException("hdfsspout.source.dir setting is required");
            }
            this.sourceDirPath = new Path(this.sourceDir);
            if (this.archiveDir == null && map.containsKey(Configs.ARCHIVE_DIR)) {
                this.archiveDir = map.get(Configs.ARCHIVE_DIR).toString();
            }
            if (this.archiveDir == null) {
                LOG.error("hdfsspout.archive.dir setting is required");
                throw new RuntimeException("hdfsspout.archive.dir setting is required");
            }
            this.archiveDirPath = new Path(this.archiveDir);
            validateOrMakeDir(this.hdfs, this.archiveDirPath, "Archive");
            if (this.badFilesDir == null && map.containsKey(Configs.BAD_DIR)) {
                this.badFilesDir = map.get(Configs.BAD_DIR).toString();
            }
            if (this.badFilesDir == null) {
                LOG.error("hdfsspout.badfiles.dir setting is required");
                throw new RuntimeException("hdfsspout.badfiles.dir setting is required");
            }
            this.badFilesDirPath = new Path(this.badFilesDir);
            validateOrMakeDir(this.hdfs, this.badFilesDirPath, "bad files");
            if (map.containsKey(Configs.IGNORE_SUFFIX)) {
                this.ignoreSuffix = map.get(Configs.IGNORE_SUFFIX).toString();
            }
            if (this.lockDir == null && map.containsKey(Configs.LOCK_DIR)) {
                this.lockDir = map.get(Configs.LOCK_DIR).toString();
            }
            if (this.lockDir == null) {
                this.lockDir = getDefaultLockDir(this.sourceDirPath);
            }
            this.lockDirPath = new Path(this.lockDir);
            validateOrMakeDir(this.hdfs, this.lockDirPath, "locks");
            if (map.get(Configs.LOCK_TIMEOUT) != null) {
                this.lockTimeoutSec = Integer.parseInt(map.get(Configs.LOCK_TIMEOUT).toString());
            }
            Object obj = map.get("topology.acker.executors");
            if (obj != null) {
                int parseInt = Integer.parseInt(obj.toString());
                this.ackEnabled = parseInt > 0;
                LOG.debug("ACKer count = {}", Integer.valueOf(parseInt));
            } else {
                this.ackEnabled = true;
                LOG.debug("ACK count not explicitly set on topology.");
            }
            LOG.info("ACK mode is {}", this.ackEnabled ? "enabled" : "disabled");
            if (map.get(Configs.COMMIT_FREQ_COUNT) != null) {
                this.commitFrequencyCount = Integer.parseInt(map.get(Configs.COMMIT_FREQ_COUNT).toString());
            }
            if (map.get(Configs.COMMIT_FREQ_SEC) != null) {
                this.commitFrequencySec = Integer.parseInt(map.get(Configs.COMMIT_FREQ_SEC).toString());
                if (this.commitFrequencySec <= 0) {
                    throw new RuntimeException("hdfsspout.commit.sec setting must be greater than 0");
                }
            }
            if (map.get(Configs.MAX_OUTSTANDING) != null) {
                this.maxOutstanding = Integer.parseInt(map.get(Configs.MAX_OUTSTANDING).toString());
            }
            if (map.get(Configs.CLOCKS_INSYNC) != null) {
                this.clocksInSync = Boolean.parseBoolean(map.get(Configs.CLOCKS_INSYNC).toString());
            }
            this.spoutId = topologyContext.getThisComponentId();
            setupCommitElapseTimer();
        } catch (IOException e2) {
            LOG.error("Unable to instantiate file system", e2);
            throw new RuntimeException("Unable to instantiate file system", e2);
        }
    }

    public void close() {
        this.commitTimer.cancel();
    }

    private static void validateOrMakeDir(FileSystem fileSystem, Path path, String str) {
        try {
            if (fileSystem.exists(path)) {
                if (!fileSystem.isDirectory(path)) {
                    LOG.error(str + " directory is a file, not a dir. " + path);
                    throw new RuntimeException(str + " directory is a file, not a dir. " + path);
                }
            } else if (!fileSystem.mkdirs(path)) {
                LOG.error("Unable to create " + str + " directory " + path);
                throw new RuntimeException("Unable to create " + str + " directory " + path);
            }
        } catch (IOException e) {
            LOG.error("Unable to create " + str + " directory " + path, e);
            throw new RuntimeException("Unable to create " + str + " directory " + path, e);
        }
    }

    private String getDefaultLockDir(Path path) {
        return path.toString() + "/" + Configs.DEFAULT_LOCK_DIR;
    }

    private static void checkValidReader(String str) {
        if (str.equalsIgnoreCase(Configs.TEXT) || str.equalsIgnoreCase(Configs.SEQ)) {
            return;
        }
        try {
            Class.forName(str).getConstructor(FileSystem.class, Path.class, Map.class);
        } catch (ClassNotFoundException e) {
            LOG.error(str + " not found in classpath.", e);
            throw new IllegalArgumentException(str + " not found in classpath.", e);
        } catch (NoSuchMethodException e2) {
            LOG.error(str + " is missing the expected constructor for Readers.", e2);
            throw new IllegalArgumentException(str + " is missing the expected constuctor for Readers.");
        }
    }

    public void ack(Object obj) {
        LOG.trace("Ack received for msg {} on spout {}", obj, this.spoutId);
        if (this.ackEnabled) {
            MessageId messageId = (MessageId) obj;
            this.inflight.remove(messageId);
            this.acksSinceLastCommit++;
            this.tracker.recordAckedOffset(messageId.offset);
            commitProgress(this.tracker.getCommitPosition());
            if (this.fileReadCompletely && this.inflight.isEmpty()) {
                markFileAsDone(this.reader.getFilePath());
                this.reader = null;
            }
            super.ack(obj);
        }
    }

    private boolean canCommitNow() {
        if (this.commitFrequencyCount <= 0 || this.acksSinceLastCommit < this.commitFrequencyCount) {
            return this.commitTimeElapsed.get();
        }
        return true;
    }

    public void fail(Object obj) {
        LOG.trace("Fail received for msg id {} on spout {}", obj, this.spoutId);
        super.fail(obj);
        if (this.ackEnabled) {
            this.retryList.add(HdfsUtils.Pair.of(obj, this.inflight.remove(obj)));
        }
    }

    private FileReader pickNextFile() {
        try {
            this.lock = getOldestExpiredLock();
            if (this.lock != null) {
                LOG.debug("Spout {} now took over ownership of abandoned FileLock {}", this.spoutId, this.lock.getLockFile());
                Path fileForLockFile = getFileForLockFile(this.lock.getLockFile(), this.sourceDirPath);
                String str = this.lock.getLastLogEntry().fileOffset;
                LOG.info("Resuming processing of abandoned file : {}", fileForLockFile);
                return createFileReader(fileForLockFile, str);
            }
            for (Path path : HdfsUtils.listFilesByModificationTime(this.hdfs, this.sourceDirPath, 0L)) {
                if (!path.getName().endsWith(this.inprogress_suffix) && !path.getName().endsWith(this.ignoreSuffix)) {
                    this.lock = FileLock.tryLock(this.hdfs, path, this.lockDirPath, this.spoutId);
                    if (this.lock == null) {
                        LOG.debug("Unable to get FileLock for {}, so skipping it.", path);
                    } else {
                        try {
                            FileReader createFileReader = createFileReader(renameToInProgressFile(path));
                            LOG.info("Processing : {} ", path);
                            return createFileReader;
                        } catch (Exception e) {
                            LOG.error("Skipping file " + path, e);
                            releaseLockAndLog(this.lock, this.spoutId);
                        }
                    }
                }
            }
            return null;
        } catch (IOException e2) {
            LOG.error("Unable to select next file for consumption " + this.sourceDirPath, e2);
            return null;
        }
    }

    /* JADX WARN: Finally extract failed */
    private FileLock getOldestExpiredLock() throws IOException {
        DirLock tryLock = DirLock.tryLock(this.hdfs, this.lockDirPath);
        if (tryLock == null) {
            tryLock = DirLock.takeOwnershipIfStale(this.hdfs, this.lockDirPath, this.lockTimeoutSec);
            if (tryLock == null) {
                LOG.debug("Spout {} could not take over ownership of DirLock for {}", this.spoutId, this.lockDirPath);
                return null;
            }
            LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}", this.spoutId, this.lockDirPath);
        } else {
            LOG.debug("Spout {} now owns DirLock for {}", this.spoutId, this.lockDirPath);
        }
        try {
            if (this.clocksInSync) {
                FileLock acquireOldestExpiredLock = FileLock.acquireOldestExpiredLock(this.hdfs, this.lockDirPath, this.lockTimeoutSec, this.spoutId);
                tryLock.release();
                LOG.debug("Released DirLock {}, SpoutID {} ", tryLock.getLockFile(), this.spoutId);
                return acquireOldestExpiredLock;
            }
            if (this.lastExpiredLock == null) {
                this.lastExpiredLock = FileLock.locateOldestExpiredLock(this.hdfs, this.lockDirPath, this.lockTimeoutSec);
                this.lastExpiredLockTime = System.currentTimeMillis();
                tryLock.release();
                LOG.debug("Released DirLock {}, SpoutID {} ", tryLock.getLockFile(), this.spoutId);
                return null;
            }
            if (hasExpired(this.lastExpiredLockTime)) {
                tryLock.release();
                LOG.debug("Released DirLock {}, SpoutID {} ", tryLock.getLockFile(), this.spoutId);
                return null;
            }
            FileLock.LogEntry lastEntry = FileLock.getLastEntry(this.hdfs, this.lastExpiredLock.getKey());
            if (!lastEntry.equals(this.lastExpiredLock.getValue())) {
                this.lastExpiredLock = null;
                tryLock.release();
                LOG.debug("Released DirLock {}, SpoutID {} ", tryLock.getLockFile(), this.spoutId);
                return null;
            }
            FileLock takeOwnership = FileLock.takeOwnership(this.hdfs, this.lastExpiredLock.getKey(), lastEntry, this.spoutId);
            this.lastExpiredLock = null;
            tryLock.release();
            LOG.debug("Released DirLock {}, SpoutID {} ", tryLock.getLockFile(), this.spoutId);
            return takeOwnership;
        } catch (Throwable th) {
            tryLock.release();
            LOG.debug("Released DirLock {}, SpoutID {} ", tryLock.getLockFile(), this.spoutId);
            throw th;
        }
    }

    private boolean hasExpired(long j) {
        return System.currentTimeMillis() - j < ((long) (this.lockTimeoutSec * 1000));
    }

    private FileReader createFileReader(Path path) throws IOException {
        if (this.readerType.equalsIgnoreCase(Configs.SEQ)) {
            return new SequenceFileReader(this.hdfs, path, this.conf);
        }
        if (this.readerType.equalsIgnoreCase(Configs.TEXT)) {
            return new TextFileReader(this.hdfs, path, this.conf);
        }
        try {
            return (FileReader) Class.forName(this.readerType).getConstructor(FileSystem.class, Path.class, Map.class).newInstance(this.hdfs, path, this.conf);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException("Unable to instantiate " + this.readerType + " reader", e);
        }
    }

    private FileReader createFileReader(Path path, String str) throws IOException {
        if (this.readerType.equalsIgnoreCase(Configs.SEQ)) {
            return new SequenceFileReader(this.hdfs, path, this.conf, str);
        }
        if (this.readerType.equalsIgnoreCase(Configs.TEXT)) {
            return new TextFileReader(this.hdfs, path, this.conf, str);
        }
        try {
            return (FileReader) Class.forName(this.readerType).getConstructor(FileSystem.class, Path.class, Map.class, String.class).newInstance(this.hdfs, path, this.conf, str);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException("Unable to instantiate " + this.readerType, e);
        }
    }

    private Path renameToInProgressFile(Path path) throws IOException {
        Path path2 = new Path(path.toString() + this.inprogress_suffix);
        try {
            if (this.hdfs.rename(path, path2)) {
                return path2;
            }
            throw new RenameException(path, path2);
        } catch (IOException e) {
            throw new RenameException(path, path2, e);
        }
    }

    private Path getFileForLockFile(Path path, Path path2) throws IOException {
        String name = path.getName();
        Path path3 = new Path(path2 + "/" + name + this.inprogress_suffix);
        if (this.hdfs.exists(path3)) {
            return path3;
        }
        Path path4 = new Path(path2 + "/" + name);
        if (this.hdfs.exists(path4)) {
            return path4;
        }
        return null;
    }

    private Path renameCompletedFile(Path path) throws IOException {
        String path2 = path.toString();
        String substring = path2.substring(0, path2.indexOf(this.inprogress_suffix));
        Path path3 = new Path(this.archiveDirPath + "/" + new Path(substring).getName());
        LOG.info("Completed consuming file {}", substring);
        if (!this.hdfs.rename(path, path3)) {
            throw new IOException("Rename failed for file: " + path);
        }
        LOG.debug("Renamed file {} to {} ", path, path3);
        return path3;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (this.outputStreamName != null) {
            outputFieldsDeclarer.declareStream(this.outputStreamName, this.outputFields);
        } else {
            outputFieldsDeclarer.declare(this.outputFields);
        }
    }
}
