package gobblin.compaction.mapreduce;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import gobblin.compaction.Dataset;
import gobblin.compaction.event.CompactionSlaEventHelper;
import gobblin.compaction.mapreduce.MRCompactorJobPropCreator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorTimeBasedJobPropCreator.class */
public class MRCompactorTimeBasedJobPropCreator extends MRCompactorJobPropCreator {
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactorTimeBasedJobPropCreator.class);
    private static final String COMPACTION_TIMEBASED_PREFIX = "compaction.timebased.";
    private static final String COMPACTION_TIMEBASED_FOLDER_PATTERN = "compaction.timebased.folder.pattern";
    private static final String DEFAULT_COMPACTION_TIMEBASED_FOLDER_PATTERN = "YYYY/MM/dd";
    private static final String COMPACTION_TIMEBASED_MAX_TIME_AGO = "compaction.timebased.max.time.ago";
    private static final String DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO = "3d";
    private static final String COMPACTION_TIMEBASED_MIN_TIME_AGO = "compaction.timebased.min.time.ago";
    private static final String DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO = "1d";
    public static final String COMPACTION_TIMEBASED_INPUT_PATH_TIME = "compaction.timebased.input.path.time";
    private final String folderTimePattern;
    private final DateTimeZone timeZone;
    private final DateTimeFormatter timeFormatter;

    /* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorTimeBasedJobPropCreator$Builder.class */
    static class Builder extends MRCompactorJobPropCreator.Builder<Builder> {
        Builder() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // gobblin.compaction.mapreduce.MRCompactorJobPropCreator.Builder
        public MRCompactorTimeBasedJobPropCreator build() {
            return new MRCompactorTimeBasedJobPropCreator(this);
        }
    }

    MRCompactorTimeBasedJobPropCreator(Builder builder) {
        super(builder);
        this.folderTimePattern = getFolderPattern();
        this.timeZone = DateTimeZone.forID(this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
        this.timeFormatter = DateTimeFormat.forPattern(this.folderTimePattern).withZone(this.timeZone);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // gobblin.compaction.mapreduce.MRCompactorJobPropCreator
    public List<Dataset> createJobProps() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        if (!this.fs.exists(this.topicInputDir)) {
            LOG.warn("Input folder " + this.topicInputDir + " does not exist. Skipping topic " + this.topic);
            return newArrayList;
        }
        for (FileStatus fileStatus : this.fs.globStatus(new Path(this.topicInputDir, getFolderStructure()))) {
            Path path = fileStatus.getPath();
            try {
                DateTime folderTime = getFolderTime(path);
                if (folderWithinAllowedPeriod(path, folderTime)) {
                    Path appendFolderTime = appendFolderTime(this.topicInputLateDir, folderTime);
                    Path appendFolderTime2 = appendFolderTime(this.topicOutputDir, folderTime);
                    Path appendFolderTime3 = appendFolderTime(this.topicOutputLateDir, folderTime);
                    Dataset build = new Dataset.Builder().withTopic(this.topic).withPriority(this.priority).withInputPath(this.recompactFromOutputPaths ? appendFolderTime2 : path).withInputLatePath(this.recompactFromOutputPaths ? appendFolderTime3 : appendFolderTime).withOutputPath(appendFolderTime2).withOutputLatePath(appendFolderTime3).withOutputTmpPath(appendFolderTime(this.topicTmpOutputDir, folderTime)).build();
                    Optional<Dataset> createJobProps = createJobProps(build, folderTime.toString(this.timeFormatter));
                    if (createJobProps.isPresent()) {
                        ((Dataset) createJobProps.get()).jobProps().setProp(COMPACTION_TIMEBASED_INPUT_PATH_TIME, Long.valueOf(folderTime.getMillis()));
                        newArrayList.add(createJobProps.get());
                        if (this.recompactFromOutputPaths || !MRCompactor.datasetAlreadyCompacted(this.fs, build)) {
                            CompactionSlaEventHelper.setUpstreamTimeStamp(this.state, folderTime.plusDays(1).getMillis());
                        }
                    }
                }
            } catch (RuntimeException e) {
                LOG.warn(path + " is not a valid folder. Will be skipped.");
            }
        }
        return newArrayList;
    }

    private String getFolderStructure() {
        return this.folderTimePattern.replaceAll("[a-zA-Z0-9]+", "*");
    }

    private String getFolderPattern() {
        String prop = this.state.getProp(COMPACTION_TIMEBASED_FOLDER_PATTERN, DEFAULT_COMPACTION_TIMEBASED_FOLDER_PATTERN);
        LOG.info("Compaction folder pattern: " + prop);
        return prop;
    }

    private DateTime getFolderTime(Path path) {
        return this.timeFormatter.parseDateTime(StringUtils.removeStart(path.toString().substring(path.toString().indexOf(this.topicInputDir.toString()) + this.topicInputDir.toString().length()), "/"));
    }

    private boolean folderWithinAllowedPeriod(Path path, DateTime dateTime) {
        DateTime dateTime2 = new DateTime(this.timeZone);
        PeriodFormatter periodFormatter = getPeriodFormatter();
        DateTime earliestAllowedFolderTime = getEarliestAllowedFolderTime(dateTime2, periodFormatter);
        DateTime latestAllowedFolderTime = getLatestAllowedFolderTime(dateTime2, periodFormatter);
        if (dateTime.isBefore(earliestAllowedFolderTime)) {
            LOG.info(String.format("Folder time for %s is %s, earlier than the earliest allowed folder time, %s. Skipping", path, dateTime, earliestAllowedFolderTime));
            return false;
        }
        if (!dateTime.isAfter(latestAllowedFolderTime)) {
            return true;
        }
        LOG.info(String.format("Folder time for %s is %s, later than the latest allowed folder time, %s. Skipping", path, dateTime, latestAllowedFolderTime));
        return false;
    }

    private PeriodFormatter getPeriodFormatter() {
        return new PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
    }

    private DateTime getEarliestAllowedFolderTime(DateTime dateTime, PeriodFormatter periodFormatter) {
        return dateTime.minus(periodFormatter.parsePeriod(this.state.getProp(COMPACTION_TIMEBASED_MAX_TIME_AGO, DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO)));
    }

    private DateTime getLatestAllowedFolderTime(DateTime dateTime, PeriodFormatter periodFormatter) {
        return dateTime.minus(periodFormatter.parsePeriod(this.state.getProp(COMPACTION_TIMEBASED_MIN_TIME_AGO, DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO)));
    }

    private Path appendFolderTime(Path path, DateTime dateTime) {
        return new Path(path, dateTime.toString(this.timeFormatter));
    }
}
