package co.cask.cdap.filetailer.tailer;

import co.cask.cdap.filetailer.AbstractWorker;
import co.cask.cdap.filetailer.PipeListener;
import co.cask.cdap.filetailer.config.PipeConfiguration;
import co.cask.cdap.filetailer.event.FileTailerEvent;
import co.cask.cdap.filetailer.metrics.FileTailerMetricsProcessor;
import co.cask.cdap.filetailer.queue.FileTailerQueue;
import co.cask.cdap.filetailer.state.FileTailerState;
import co.cask.cdap.filetailer.state.FileTailerStateProcessor;
import co.cask.cdap.filetailer.state.exception.FileTailerStateProcessorException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Comparator;
import java.util.Iterator;
import java.util.TreeMap;
import javax.ws.rs.NotSupportedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/filetailer/tailer/LogTailer.class */
public class LogTailer extends AbstractWorker {
    private static final String RAF_MODE = "r";
    private static final int DEFAULT_BUFSIZE = 4096;
    private final long sleepInterval;
    private final File logDirectory;
    private final String logFileName;
    private final Charset charset;
    private final int failureRetryLimit;
    private final long failureSleepInterval;
    private final FileTailerQueue queue;
    private final char entrySeparator;
    private final int separatorByteLength;
    private final FileTailerStateProcessor fileTailerStateProcessor;
    private final FileTailerMetricsProcessor metricsProcessor;
    private final String rotationPattern;
    private final CharsetDecoder decoder;
    private final ByteBuffer readBuffer;
    private final CharBuffer decoded;
    private final boolean readRotatedFiles;
    private final PipeListener pipeListener;
    private static final Logger LOG = LoggerFactory.getLogger(LogTailer.class);
    private static final Comparator<LogFileTime> logFileComparator = new Comparator<LogFileTime>() { // from class: co.cask.cdap.filetailer.tailer.LogTailer.1
        @Override // java.util.Comparator
        public int compare(LogFileTime logFileTime, LogFileTime logFileTime2) {
            int compareTo = logFileTime.getModificationTime().compareTo(logFileTime2.getModificationTime());
            if (compareTo != 0) {
                return compareTo;
            }
            int length = logFileTime2.getFileName().length() - logFileTime.getFileName().length();
            return length != 0 ? length : logFileTime2.getFileName().compareTo(logFileTime.getFileName());
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/cask/cdap/filetailer/tailer/LogTailer$LogFileTime.class */
    public class LogFileTime {
        private final long modificationTime;
        private final String fileName;

        LogFileTime(long j, String str) {
            this.modificationTime = j;
            this.fileName = str;
        }

        public Long getModificationTime() {
            return Long.valueOf(this.modificationTime);
        }

        public String getFileName() {
            return this.fileName;
        }
    }

    public LogTailer(PipeConfiguration pipeConfiguration, FileTailerQueue fileTailerQueue, FileTailerStateProcessor fileTailerStateProcessor, FileTailerMetricsProcessor fileTailerMetricsProcessor, PipeListener pipeListener) {
        String charsetName = pipeConfiguration.getSourceConfiguration().getCharsetName();
        if (!Charset.isSupported(charsetName)) {
            LOG.error("Charset {} is not supported", charsetName);
            throw new NotSupportedException("Charset " + charsetName + " is not supported");
        }
        this.charset = Charset.forName(charsetName);
        this.decoder = this.charset.newDecoder();
        this.readBuffer = ByteBuffer.allocate(DEFAULT_BUFSIZE);
        this.decoded = CharBuffer.allocate(DEFAULT_BUFSIZE);
        this.queue = fileTailerQueue;
        this.fileTailerStateProcessor = fileTailerStateProcessor;
        this.metricsProcessor = fileTailerMetricsProcessor;
        this.pipeListener = pipeListener;
        this.sleepInterval = pipeConfiguration.getSourceConfiguration().getSleepInterval();
        this.logDirectory = pipeConfiguration.getSourceConfiguration().getWorkDir();
        this.logFileName = pipeConfiguration.getSourceConfiguration().getFileName();
        this.entrySeparator = pipeConfiguration.getSourceConfiguration().getRecordSeparator();
        this.separatorByteLength = Character.valueOf(this.entrySeparator).toString().getBytes(this.charset).length;
        this.failureRetryLimit = pipeConfiguration.getSourceConfiguration().getFailureRetryLimit();
        this.failureSleepInterval = pipeConfiguration.getSourceConfiguration().getFailureSleepInterval();
        this.rotationPattern = pipeConfiguration.getSourceConfiguration().getRotationPattern();
        this.readRotatedFiles = pipeConfiguration.getSourceConfiguration().getReadRotatedFilesMode();
    }

    public void run() {
        try {
            checkLogDirExists(this.logDirectory);
            FileTailerState saveStateFromFile = getSaveStateFromFile();
            try {
                if (saveStateFromFile == null) {
                    LOG.info("File Tailer state was not found; start reading all logs from the directory from the beginning");
                    runWithOutRestore();
                } else {
                    LOG.info("Start recover from state file");
                    runFromSaveState(saveStateFromFile);
                }
            } catch (InterruptedException e) {
                LOG.info("Tailer daemon was interrupted");
            }
            LOG.info("Tailer daemon stopped");
        } catch (LogDirNotFoundException e2) {
            LOG.error("Incorrect path to log directory; directory {} does not exist", this.logDirectory.getAbsolutePath());
        }
    }

    private FileTailerState getSaveStateFromFile() {
        try {
            return this.fileTailerStateProcessor.loadState();
        } catch (FileTailerStateProcessorException e) {
            LOG.info("Fail state do not exist. Start reading all directory");
            return null;
        }
    }

    private void runFromSaveState(FileTailerState fileTailerState) throws InterruptedException {
        long position = fileTailerState.getPosition();
        long lastModifyTime = fileTailerState.getLastModifyTime();
        String fileName = fileTailerState.getFileName();
        int hash = fileTailerState.getHash();
        File nextLogFile = getNextLogFile(this.logDirectory.getAbsolutePath(), Long.valueOf(lastModifyTime), true, new File(fileName));
        if (nextLogFile == null) {
            LOG.info("Saved log file not exist. Exiting");
            return;
        }
        try {
            FileChannel tryOpenFile = tryOpenFile(nextLogFile);
            if (!checkLine(tryOpenFile, position, hash)) {
                LOG.error("Can not find line from saved state. Exiting.. ");
            } else {
                LOG.info("Saved log entry was found. Start reading log from save state");
                startReadingFromFile(tryOpenFile, nextLogFile);
            }
        } catch (IOException e) {
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0026, code lost:
    
        r7.pipeListener.onRead();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void startReadingFromFile(java.nio.channels.FileChannel r8, java.io.File r9) throws java.lang.InterruptedException {
        /*
            r7 = this;
            r0 = r9
            long r0 = r0.lastModified()
            r10 = r0
        L5:
            r0 = r7
            boolean r0 = r0.isRunning()     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            if (r0 == 0) goto L84
            r0 = r7
            r1 = r8
            r2 = r7
            char r2 = r2.entrySeparator     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r3 = r9
            r4 = r10
            long r0 = r0.tryReadFromFile(r1, r2, r3, r4)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r10 = r0
            r0 = r7
            boolean r0 = r0.readRotatedFiles     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            if (r0 != 0) goto L32
            r0 = r7
            co.cask.cdap.filetailer.PipeListener r0 = r0.pipeListener     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            if (r0 == 0) goto L32
            r0 = r7
            co.cask.cdap.filetailer.PipeListener r0 = r0.pipeListener     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r0.onRead()     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            goto L84
        L32:
            r0 = r7
            r1 = r7
            java.io.File r1 = r1.logDirectory     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            java.lang.String r1 = r1.getAbsolutePath()     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r2 = r10
            java.lang.Long r2 = java.lang.Long.valueOf(r2)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r3 = 0
            r4 = r9
            java.io.File r0 = r0.getNextLogFile(r1, r2, r3, r4)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r12 = r0
            r0 = r12
            if (r0 != 0) goto L5f
            org.slf4j.Logger r0 = co.cask.cdap.filetailer.tailer.LogTailer.LOG     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            java.lang.String r1 = "Waiting for new log data from file {}"
            r2 = r9
            r0.debug(r1, r2)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r0 = r7
            long r0 = r0.sleepInterval     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            java.lang.Thread.sleep(r0)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            goto L81
        L5f:
            org.slf4j.Logger r0 = co.cask.cdap.filetailer.tailer.LogTailer.LOG     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            java.lang.String r1 = "Reading file {}"
            r2 = r12
            r0.debug(r1, r2)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r0 = r12
            r9 = r0
            r0 = r7
            r1 = r8
            r0.closeQuietly(r1)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            java.io.RandomAccessFile r0 = new java.io.RandomAccessFile     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r1 = r0
            r2 = r9
            java.lang.String r3 = "r"
            r1.<init>(r2, r3)     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            java.nio.channels.FileChannel r0 = r0.getChannel()     // Catch: java.io.IOException -> L8c java.lang.Throwable -> La5
            r8 = r0
        L81:
            goto L5
        L84:
            r0 = r7
            r1 = r8
            r0.closeQuietly(r1)
            goto Laf
        L8c:
            r12 = move-exception
            org.slf4j.Logger r0 = co.cask.cdap.filetailer.tailer.LogTailer.LOG     // Catch: java.lang.Throwable -> La5
            java.lang.String r1 = "Tailer daemon stopped due to IO exception while reading file: {}"
            r2 = r12
            java.lang.String r2 = r2.getMessage()     // Catch: java.lang.Throwable -> La5
            r0.error(r1, r2)     // Catch: java.lang.Throwable -> La5
            r0 = r7
            r1 = r8
            r0.closeQuietly(r1)
            goto Laf
        La5:
            r13 = move-exception
            r0 = r7
            r1 = r8
            r0.closeQuietly(r1)
            r0 = r13
            throw r0
        Laf:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.filetailer.tailer.LogTailer.startReadingFromFile(java.nio.channels.FileChannel, java.io.File):void");
    }

    private void runWithOutRestore() throws InterruptedException {
        File file = null;
        while (file == null && isRunning()) {
            file = getNextLogFile(this.logDirectory.getAbsolutePath(), 0L, false, new File(this.logFileName));
            if (file == null) {
                try {
                    Thread.sleep(this.sleepInterval);
                } catch (InterruptedException e) {
                    LOG.info("Tailer daemon was interrupted");
                    return;
                }
            }
        }
        try {
            startReadingFromFile(tryOpenFile(file), file);
        } catch (IOException e2) {
        }
    }

    private boolean checkLine(FileChannel fileChannel, long j, int i) throws IOException, InterruptedException {
        fileChannel.position(j);
        String tryReadLine = tryReadLine(fileChannel, this.entrySeparator);
        return tryReadLine.length() > 0 && tryReadLine.hashCode() == i;
    }

    private File getNextLogFile(String str, Long l, boolean z, File file) {
        LogFileTime logFileTime;
        File[] listFiles = new File(str).listFiles(new LogFilter(this.logFileName, this.rotationPattern));
        TreeMap treeMap = new TreeMap(logFileComparator);
        for (File file2 : listFiles) {
            treeMap.put(new LogFileTime(file2.lastModified(), file2.getName()), file2);
        }
        if (l.longValue() == 0 && treeMap.size() > 0) {
            return (File) treeMap.firstEntry().getValue();
        }
        if (z && treeMap.containsKey(new LogFileTime(l.longValue(), file.getName()))) {
            return (File) treeMap.get(new LogFileTime(l.longValue(), file.getName()));
        }
        boolean z2 = true;
        Iterator it = treeMap.keySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (((LogFileTime) it.next()).getModificationTime().equals(l)) {
                z2 = false;
                break;
            }
        }
        if ((z2 && treeMap.higherKey(new LogFileTime(l.longValue(), file.getName())) == null) || (logFileTime = (LogFileTime) treeMap.higherKey(new LogFileTime(l.longValue(), file.getName()))) == null) {
            return null;
        }
        return (File) treeMap.get(logFileTime);
    }

    private void checkLogDirExists(File file) throws LogDirNotFoundException {
        if (!file.exists()) {
            throw new LogDirNotFoundException("Configured log directory not found");
        }
    }

    private FileChannel tryOpenFile(File file) throws IOException, InterruptedException {
        int i = 0;
        FileChannel fileChannel = null;
        while (isRunning()) {
            if (i > this.failureRetryLimit && this.failureRetryLimit > 0) {
                LOG.error("fail to open file after {} attempts", Integer.valueOf(i));
                throw new IOException();
            }
            try {
                fileChannel = new RandomAccessFile(file, RAF_MODE).getChannel();
                break;
            } catch (IOException e) {
                i++;
                Thread.sleep(this.failureSleepInterval);
            }
        }
        return fileChannel;
    }

    private String tryReadLine(FileChannel fileChannel, char c) throws IOException, InterruptedException {
        char charAt;
        int i = 0;
        long position = fileChannel.position();
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        while (z) {
            if (i > this.failureRetryLimit && this.failureRetryLimit > 0) {
                LOG.error("fail to read line  after {} attempts", Integer.valueOf(i));
                throw new IOException();
            }
            try {
                this.readBuffer.clear();
                this.decoded.clear();
                z = false;
                if (fileChannel.read(this.readBuffer) >= 0) {
                    this.readBuffer.flip();
                    this.decoder.decode(this.readBuffer, this.decoded, false);
                    this.decoded.flip();
                    for (int i2 = 0; i2 < this.decoded.length() && (charAt = this.decoded.charAt(i2)) != c; i2++) {
                        sb.append(charAt);
                    }
                }
            } catch (IOException e) {
                i++;
                Thread.sleep(this.failureSleepInterval);
            }
        }
        String sb2 = sb.toString();
        fileChannel.position(position + sb2.getBytes(this.charset).length + this.separatorByteLength);
        return sb2;
    }

    private long tryReadFromFile(FileChannel fileChannel, char c, File file, long j) throws IOException, InterruptedException {
        int i = 0;
        long position = fileChannel.position();
        StringBuilder sb = new StringBuilder();
        while (isRunning()) {
            if (i > this.failureRetryLimit && this.failureRetryLimit > 0) {
                LOG.error("fail to read line  after {} attempts", Integer.valueOf(i));
                throw new IOException();
            }
            try {
                this.readBuffer.clear();
                this.decoded.clear();
            } catch (IOException e) {
                i++;
                Thread.sleep(this.failureSleepInterval);
            }
            if (fileChannel.read(this.readBuffer) < 0) {
                break;
            }
            this.readBuffer.flip();
            this.decoder.decode(this.readBuffer, this.decoded, false);
            this.decoded.flip();
            for (int i2 = 0; i2 < this.decoded.length(); i2++) {
                char charAt = this.decoded.charAt(i2);
                if (charAt != c) {
                    sb.append(charAt);
                } else {
                    String sb2 = sb.toString();
                    int hashCode = sb2.hashCode();
                    LOG.debug("From log file {} read entry: {}", file, sb2);
                    j = file.lastModified();
                    this.queue.put(new FileTailerEvent(new FileTailerState(file.toString(), position, hashCode, j), sb2, this.charset));
                    this.metricsProcessor.onReadEventMetric(sb2.getBytes(this.charset).length);
                    position += sb2.getBytes(this.charset).length + this.separatorByteLength;
                    sb.setLength(0);
                }
            }
        }
        return j;
    }

    private void closeQuietly(FileChannel fileChannel) {
        if (fileChannel != null) {
            try {
                fileChannel.close();
            } catch (IOException e) {
                LOG.warn("Exception during closing: {}", e.getMessage(), e);
            }
        }
    }
}
