package gobblin.util.logs;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractScheduledService;
import gobblin.configuration.ConfigurationKeys;
import gobblin.util.DatasetFilterUtils;
import gobblin.util.FileListUtils;
import gobblin.util.HadoopUtils;
import gobblin.util.concurrent.ScheduledTask;
import gobblin.util.concurrent.TaskScheduler;
import gobblin.util.concurrent.TaskSchedulerFactory;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-utility-0.11.0.jar:gobblin/util/logs/LogCopier.class */
public class LogCopier extends AbstractScheduledService {
    private static final long DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL = 120;
    private static final long DEFAULT_LOG_COPY_INTERVAL_SECONDS = 60;
    private static final long DEFAULT_MAX_MINUTES_PER_LOG_FILE = Long.MAX_VALUE;
    private static final long DEFAULT_MAX_BYTES_PER_LOG_FILE = Long.MAX_VALUE;
    private static final int DEFAULT_LINES_WRITTEN_BEFORE_FLUSH = 100;
    private final FileSystem srcFs;
    private final FileSystem destFs;
    private final Path srcLogDir;
    private final Path destLogDir;
    private final long sourceLogFileMonitorInterval;
    private final long copyInterval;
    private final long maxMinutesPerLogFile;
    private final long maxBytesPerLogFile;
    private final TimeUnit timeUnit;
    private final Set<String> logFileExtensions;
    private final Optional<List<Pattern>> includingRegexPatterns;
    private final Optional<List<Pattern>> excludingRegexPatterns;
    private final Optional<String> logFileNamePrefix;
    private final int linesWrittenBeforeFlush;
    private final TaskScheduler<Path, LogCopyTask> scheduler;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) LogCopier.class);
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

    /* loaded from: input_file:WEB-INF/lib/gobblin-utility-0.11.0.jar:gobblin/util/logs/LogCopier$Builder.class */
    public static class Builder {
        private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
        private FileSystem srcFs;
        private Path srcLogDir;
        private FileSystem destFs;
        private Path destLogDir;
        private Set<String> logFileExtensions;
        private List<Pattern> includingRegexPatterns;
        private List<Pattern> excludingRegexPatterns;
        private String logFileNamePrefix;
        private long sourceLogFileMonitorInterval = LogCopier.DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL;
        private long copyInterval = 60;
        private long maxMinutesPerLogFile = Long.MAX_VALUE;
        private long maxBytesPerLogFile = Long.MAX_VALUE;
        private TimeUnit timeUnit = LogCopier.DEFAULT_TIME_UNIT;
        private int linesWrittenBeforeFlush = 100;
        private String schedulerName = null;

        public Builder useSourceLogFileMonitorInterval(long j) {
            Preconditions.checkArgument(j > 0, "Source log file monitor interval must be positive");
            this.sourceLogFileMonitorInterval = j;
            return this;
        }

        public Builder useCopyInterval(long j) {
            Preconditions.checkArgument(j > 0, "Copy interval must be positive");
            this.copyInterval = j;
            return this;
        }

        public Builder useMaxMinutesPerLogFile(long j) {
            Preconditions.checkArgument(j > 0, "Max minutes per log file must be positive");
            this.maxMinutesPerLogFile = j;
            return this;
        }

        public Builder useMaxBytesPerLogFile(long j) {
            Preconditions.checkArgument(j > 0, "Max bytes per log file must be positive");
            this.maxBytesPerLogFile = j;
            return this;
        }

        public Builder useTimeUnit(TimeUnit timeUnit) {
            Preconditions.checkNotNull(timeUnit);
            this.timeUnit = timeUnit;
            return this;
        }

        public Builder acceptsLogFileExtensions(Set<String> set) {
            Preconditions.checkNotNull(set);
            this.logFileExtensions = ImmutableSet.copyOf((Collection) set);
            return this;
        }

        public Builder useIncludingRegexPatterns(String str) {
            Preconditions.checkNotNull(str);
            this.includingRegexPatterns = DatasetFilterUtils.getPatternsFromStrings(COMMA_SPLITTER.splitToList(str));
            return this;
        }

        public Builder useExcludingRegexPatterns(String str) {
            Preconditions.checkNotNull(str);
            this.excludingRegexPatterns = DatasetFilterUtils.getPatternsFromStrings(COMMA_SPLITTER.splitToList(str));
            return this;
        }

        public Builder useSrcFileSystem(FileSystem fileSystem) {
            Preconditions.checkNotNull(fileSystem);
            this.srcFs = fileSystem;
            return this;
        }

        public Builder useDestFileSystem(FileSystem fileSystem) {
            Preconditions.checkNotNull(fileSystem);
            this.destFs = fileSystem;
            return this;
        }

        public Builder readFrom(Path path) {
            Preconditions.checkNotNull(path);
            this.srcLogDir = path;
            return this;
        }

        public Builder writeTo(Path path) {
            Preconditions.checkNotNull(path);
            this.destLogDir = path;
            return this;
        }

        public Builder useLogFileNamePrefix(String str) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "Invalid log file name prefix: " + str);
            this.logFileNamePrefix = str;
            return this;
        }

        public Builder useLinesWrittenBeforeFlush(int i) {
            Preconditions.checkArgument(i > 0, "The value specifying the lines to write before flush must be positive");
            this.linesWrittenBeforeFlush = i;
            return this;
        }

        public Builder useScheduler(String str) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "Invalid scheduler name: " + str);
            this.schedulerName = str;
            return this;
        }

        public LogCopier build() {
            return new LogCopier(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-utility-0.11.0.jar:gobblin/util/logs/LogCopier$LogCopyTask.class */
    public class LogCopyTask implements ScheduledTask<Path> {
        private final Path srcLogFile;
        private final Path destLogFile;
        private long currentPos = 0;
        private final Stopwatch watch = Stopwatch.createStarted();

        public LogCopyTask(Path path, Path path2) {
            this.srcLogFile = path;
            this.destLogFile = path2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // gobblin.util.concurrent.ScheduledTask
        public Path getKey() {
            return this.srcLogFile;
        }

        @Override // gobblin.util.concurrent.ScheduledTask
        public void runOneIteration() {
            try {
                createNewLogFileIfNeeded();
                LogCopier.LOGGER.debug(String.format("Copying changes from %s to %s", this.srcLogFile, this.destLogFile));
                copyChangesOfLogFile(LogCopier.this.srcFs.makeQualified(this.srcLogFile), LogCopier.this.destFs.makeQualified(this.destLogFile));
            } catch (IOException e) {
                LogCopier.LOGGER.error(String.format("Failed while copying logs from %s to %s", this.srcLogFile, this.destLogFile), (Throwable) e);
            }
        }

        private void createNewLogFileIfNeeded() throws IOException {
            if (LogCopier.this.destFs.exists(this.destLogFile)) {
                if (this.watch.elapsed(TimeUnit.MINUTES) > LogCopier.this.maxMinutesPerLogFile || LogCopier.this.destFs.getFileStatus(this.destLogFile).getLen() > LogCopier.this.maxBytesPerLogFile) {
                    HadoopUtils.renamePath(LogCopier.this.destFs, this.destLogFile, new Path(this.destLogFile.toString() + "." + System.currentTimeMillis()));
                    this.watch.reset();
                    this.watch.start();
                }
            }
        }

        private void copyChangesOfLogFile(Path path, Path path2) throws IOException {
            String readLine;
            if (!LogCopier.this.srcFs.exists(path)) {
                LogCopier.LOGGER.warn("Source log file not found: " + path);
                return;
            }
            FSDataInputStream fSDataInputStream = null;
            try {
                Closer create = Closer.create();
                Throwable th = null;
                try {
                    try {
                        fSDataInputStream = (FSDataInputStream) create.register(LogCopier.this.srcFs.open(path));
                        LogCopier.LOGGER.debug(String.format("Reading log file %s from position %d", path, Long.valueOf(this.currentPos)));
                        fSDataInputStream.seek(this.currentPos);
                        BufferedReader bufferedReader = (BufferedReader) create.register(new BufferedReader(new InputStreamReader(fSDataInputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                        BufferedWriter bufferedWriter = (BufferedWriter) create.register(new BufferedWriter(new OutputStreamWriter(LogCopier.this.destFs.exists(path2) ? LogCopier.this.destFs.append(path2) : LogCopier.this.destFs.create(path2), ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                        int i = 0;
                        while (!Thread.currentThread().isInterrupted() && (readLine = bufferedReader.readLine()) != null) {
                            if (shouldCopyLine(readLine)) {
                                bufferedWriter.write(readLine);
                                bufferedWriter.newLine();
                                i++;
                                if (i % LogCopier.this.linesWrittenBeforeFlush == 0) {
                                    bufferedWriter.flush();
                                }
                            }
                        }
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                create.close();
                            }
                        }
                        if (fSDataInputStream != null) {
                            this.currentPos = fSDataInputStream.getPos();
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (fSDataInputStream != null) {
                    this.currentPos = fSDataInputStream.getPos();
                }
                throw th4;
            }
        }

        private boolean shouldCopyLine(String str) {
            return !(LogCopier.this.excludingRegexPatterns.isPresent() && DatasetFilterUtils.stringInPatterns(str, (List) LogCopier.this.excludingRegexPatterns.get())) && (!LogCopier.this.includingRegexPatterns.isPresent() || DatasetFilterUtils.stringInPatterns(str, (List) LogCopier.this.includingRegexPatterns.get()));
        }
    }

    private LogCopier(Builder builder) {
        this.srcFs = builder.srcFs;
        this.destFs = builder.destFs;
        this.srcLogDir = this.srcFs.makeQualified(builder.srcLogDir);
        this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
        this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
        this.copyInterval = builder.copyInterval;
        this.maxMinutesPerLogFile = builder.maxMinutesPerLogFile;
        this.maxBytesPerLogFile = builder.maxBytesPerLogFile;
        this.timeUnit = builder.timeUnit;
        this.logFileExtensions = builder.logFileExtensions;
        this.includingRegexPatterns = Optional.fromNullable(builder.includingRegexPatterns);
        this.excludingRegexPatterns = Optional.fromNullable(builder.excludingRegexPatterns);
        this.logFileNamePrefix = Optional.fromNullable(builder.logFileNamePrefix);
        this.linesWrittenBeforeFlush = builder.linesWrittenBeforeFlush;
        this.scheduler = TaskSchedulerFactory.get(builder.schedulerName, (Optional<String>) Optional.absent());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.google.common.util.concurrent.AbstractScheduledService
    public void shutDown() throws Exception {
        this.scheduler.close();
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void runOneIteration() throws IOException {
        checkSrcLogFiles();
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule(0L, this.sourceLogFileMonitorInterval, this.timeUnit);
    }

    private void checkSrcLogFiles() throws IOException {
        List<FileStatus> listFilesRecursively = FileListUtils.listFilesRecursively(this.srcFs, this.srcLogDir, new PathFilter() { // from class: gobblin.util.logs.LogCopier.1
            @Override // org.apache.hadoop.fs.PathFilter
            public boolean accept(Path path) {
                return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
            }
        });
        if (listFilesRecursively.isEmpty()) {
            LOGGER.warn("No log file found under directory " + this.srcLogDir);
            return;
        }
        HashSet<Path> newHashSet = Sets.newHashSet();
        Iterator<FileStatus> it = listFilesRecursively.iterator();
        while (it.hasNext()) {
            newHashSet.add(it.next().getPath());
        }
        HashSet newHashSet2 = Sets.newHashSet(getSourceFiles());
        newHashSet2.removeAll(newHashSet);
        newHashSet.removeAll(getSourceFiles());
        for (Path path : newHashSet) {
            this.scheduler.schedule(new LogCopyTask(path, new Path(this.destLogDir, this.logFileNamePrefix.isPresent() ? this.logFileNamePrefix.get() + "." + path.getName() : path.getName())), this.copyInterval, this.timeUnit);
        }
        Iterator it2 = newHashSet2.iterator();
        while (it2.hasNext()) {
            Optional<LogCopyTask> scheduledTask = this.scheduler.getScheduledTask((Path) it2.next());
            if (scheduledTask.isPresent()) {
                this.scheduler.cancel(scheduledTask.get());
            }
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private ImmutableList<Path> getSourceFiles() {
        return ImmutableList.copyOf(Iterables.transform(this.scheduler.getScheduledTasks(), new Function<LogCopyTask, Path>() { // from class: gobblin.util.logs.LogCopier.2
            @Override // com.google.common.base.Function
            public Path apply(LogCopyTask logCopyTask) {
                return logCopyTask.getKey();
            }
        }));
    }
}
