package org.apache.gobblin.source;

import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.json.JsonSchema;
import org.apache.gobblin.source.PartitionAwareFileRetriever;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
import org.apache.gobblin.util.DatePartitionType;
import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.DurationFieldType;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/source/DatePartitionedNestedRetriever.class */
public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriever {
    private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedNestedRetriever.class);
    private DateTimeFormatter partitionPatternFormatter;
    private DurationFieldType incrementalUnit;
    private String sourcePartitionPrefix;
    private String sourcePartitionSuffix;
    private Path sourceDir;
    private HadoopFsHelper helper;
    private final String expectedExtension;
    private Duration leadTimeDuration;
    private boolean schemaInSourceDir;
    private String schemaFile;
    protected FileSystem fs;

    public DatePartitionedNestedRetriever(String str) {
        this.expectedExtension = str;
    }

    @Override // org.apache.gobblin.source.PartitionAwareFileRetriever
    public void init(SourceState sourceState) {
        DateTimeZone.setDefault(DateTimeZone.forID(sourceState.getProp("source.timezone", TimeBasedWriterPartitioner.DEFAULT_WRITER_PARTITION_TIMEZONE)));
        initDatePartition(sourceState);
        this.sourcePartitionPrefix = sourceState.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PREFIX, JsonSchema.DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY);
        this.sourcePartitionSuffix = sourceState.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX, JsonSchema.DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY);
        this.sourceDir = new Path(sourceState.getProp("source.filebased.data.directory"));
        this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(sourceState);
        this.helper = new HadoopFsHelper(sourceState);
        this.schemaInSourceDir = sourceState.getPropAsBoolean("schema.in.source.dir", false);
        this.schemaFile = this.schemaInSourceDir ? sourceState.getProp("schema.filename", "metadata.json") : JsonSchema.DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY;
    }

    @Override // org.apache.gobblin.source.PartitionAwareFileRetriever
    public List<PartitionAwareFileRetriever.FileInfo> getFilesToProcess(long j, int i) throws IOException {
        DateTime minus = new DateTime().minus(this.leadTimeDuration);
        DateTime dateTime = new DateTime(j);
        ArrayList arrayList = new ArrayList();
        try {
            this.helper.connect();
            this.fs = this.helper.getFileSystem();
            DateTime dateTime2 = dateTime;
            while (true) {
                DateTime dateTime3 = dateTime2;
                if (dateTime3.isAfter(minus) || arrayList.size() >= i) {
                    break;
                }
                Path constructSourcePath = constructSourcePath(dateTime3);
                if (this.fs.exists(constructSourcePath)) {
                    for (FileStatus fileStatus : getFilteredFileStatuses(constructSourcePath, getFileFilter())) {
                        LOG.info("Will process file " + fileStatus.getPath());
                        arrayList.add(new PartitionAwareFileRetriever.FileInfo(fileStatus.getPath().toString(), fileStatus.getLen(), dateTime3.getMillis()));
                    }
                }
                dateTime2 = dateTime3.withFieldAdded(this.incrementalUnit, 1);
            }
            return arrayList;
        } catch (FileBasedHelperException e) {
            throw new IOException("Error initializing FileSystem", e);
        }
    }

    protected FileStatus[] getFilteredFileStatuses(Path path, PathFilter pathFilter) throws IOException {
        return this.fs.listStatus(path, pathFilter);
    }

    @Override // org.apache.gobblin.source.PartitionAwareFileRetriever
    public long getWatermarkFromString(String str) {
        return this.partitionPatternFormatter.parseMillis(str);
    }

    @Override // org.apache.gobblin.source.PartitionAwareFileRetriever
    public long getWatermarkIncrementMs() {
        return new DateTime(0L).withFieldAdded(this.incrementalUnit, 1).getMillis();
    }

    private void initDatePartition(State state) {
        initDatePartitionFromPattern(state);
        if (this.partitionPatternFormatter == null) {
            initDatePartitionFromGranularity(state);
        }
    }

    private void initDatePartitionFromPattern(State state) {
        String str = null;
        try {
            str = state.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN);
            if (str != null) {
                this.partitionPatternFormatter = DateTimeFormat.forPattern(str).withZone(DateTimeZone.getDefault());
                this.incrementalUnit = DatePartitionType.getLowestIntervalUnit(str).getDurationType();
            }
        } catch (Exception e) {
            throw new IllegalArgumentException("Invalid source partition pattern: " + str, e);
        }
    }

    private void initDatePartitionFromGranularity(State state) {
        DatePartitionType datePartitionType;
        String prop = state.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY);
        if (prop == null) {
            datePartitionType = PartitionedFileSourceBase.DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY;
        } else {
            Optional ifPresent = Enums.getIfPresent(DatePartitionType.class, prop.toUpperCase());
            Preconditions.checkState(ifPresent.isPresent(), "Invalid source partition granularity: " + prop);
            datePartitionType = (DatePartitionType) ifPresent.get();
        }
        this.partitionPatternFormatter = DateTimeFormat.forPattern(datePartitionType.getDateTimePattern());
        this.incrementalUnit = datePartitionType.getDateTimeFieldType().getDurationType();
    }

    private Path constructSourcePath(DateTime dateTime) {
        StringBuilder sb = new StringBuilder();
        if (!this.sourcePartitionPrefix.isEmpty()) {
            sb.append(this.sourcePartitionPrefix);
            sb.append("/");
        }
        sb.append(this.partitionPatternFormatter.print(dateTime));
        if (!this.sourcePartitionSuffix.isEmpty()) {
            sb.append("/");
            sb.append(this.sourcePartitionSuffix);
        }
        return new Path(this.sourceDir, sb.toString());
    }

    private PathFilter getFileFilter() {
        final String str = this.expectedExtension.startsWith(".") ? this.expectedExtension : "." + this.expectedExtension;
        return new PathFilter() { // from class: org.apache.gobblin.source.DatePartitionedNestedRetriever.1
            public boolean accept(Path path) {
                return path.getName().endsWith(str) && !(DatePartitionedNestedRetriever.this.schemaInSourceDir && path.getName().equals(DatePartitionedNestedRetriever.this.schemaFile));
            }
        };
    }
}
