package gobblin.source;

import com.google.common.base.Throwables;
import gobblin.configuration.SourceState;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.source.extractor.DatePartitionedAvroFileExtractor;
import gobblin.source.extractor.Extractor;
import gobblin.source.extractor.filebased.FileBasedHelperException;
import gobblin.source.extractor.filebased.FileBasedSource;
import gobblin.source.extractor.hadoop.AvroFsHelper;
import gobblin.source.workunit.Extract;
import gobblin.source.workunit.MultiWorkUnitWeightedQueue;
import gobblin.source.workunit.WorkUnit;
import gobblin.writer.partitioner.TimeBasedWriterPartitioner;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/source/DatePartitionedDailyAvroSource.class */
public class DatePartitionedDailyAvroSource extends FileBasedSource<Schema, GenericRecord> {
    private static final String DATE_PARTITIONED_SOURCE_PREFIX = "date.partitioned.source.";
    private static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = "date.partitioned.source.min.watermark.value";
    private static final String DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB = "date.partitioned.source.max.files.per.job";
    private static final String DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = "date.partitioned.source.max.workunits.per.job";
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB = 2000;
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = 500;
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = 0;
    private static final String DAILY_FOLDER_NAME = "daily";
    private static final String AVRO_SUFFIX = ".avro";
    private static final DateTimeFormatter DAILY_FOLDER_FORMATTER = DateTimeFormat.forPattern("yyyy/MM/dd");
    private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedDailyAvroSource.class);
    private SourceState sourceState;
    private FileSystem fs;
    private long lowWaterMark;
    private int maxFilesPerJob;
    private int maxWorkUnitsPerJob;
    private int fileCount;
    private Extract.TableType tableType;
    private Path sourceDir;

    private void init(SourceState sourceState) {
        DateTimeZone.setDefault(DateTimeZone.forID(sourceState.getProp("source.timezone", TimeBasedWriterPartitioner.DEFAULT_WRITER_PARTITION_TIMEZONE)));
        try {
            initFileSystemHelper(sourceState);
        } catch (FileBasedHelperException e) {
            Throwables.propagate(e);
        }
        this.fs = ((AvroFsHelper) this.fsHelper).getFileSystem();
        this.sourceState = sourceState;
        this.lowWaterMark = getLowWaterMark(sourceState.getPreviousWorkUnitStates(), sourceState.getProp(DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, DAILY_FOLDER_FORMATTER.print(0L)));
        this.maxFilesPerJob = sourceState.getPropAsInt(DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB, DEFAULT_DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB);
        this.maxWorkUnitsPerJob = sourceState.getPropAsInt(DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB, DEFAULT_DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB);
        this.tableType = Extract.TableType.valueOf(sourceState.getProp("extract.table.type").toUpperCase());
        this.fileCount = 0;
        this.sourceDir = new Path(sourceState.getProp("source.filebased.data.directory"));
    }

    @Override // gobblin.source.extractor.filebased.FileBasedSource
    public void initFileSystemHelper(State state) throws FileBasedHelperException {
        this.fsHelper = new AvroFsHelper(state);
        this.fsHelper.connect();
    }

    public Extractor<Schema, GenericRecord> getExtractor(WorkUnitState workUnitState) throws IOException {
        return new DatePartitionedAvroFileExtractor(workUnitState);
    }

    @Override // gobblin.source.extractor.filebased.FileBasedSource
    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        init(sourceState);
        LOG.info("Will pull data from " + DAILY_FOLDER_FORMATTER.print(this.lowWaterMark) + " until " + this.maxFilesPerJob + " files have been processed, or until there is no more data to consume");
        LOG.info("Creating workunits");
        MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue = new MultiWorkUnitWeightedQueue(this.maxWorkUnitsPerJob);
        addFailedWorkUnits(getPreviousWorkUnitsForRetry(this.sourceState), multiWorkUnitWeightedQueue);
        if (this.fileCount >= this.maxFilesPerJob) {
            LOG.info("The number of work units from previous job has already reached the upper limit, no more workunits will be made");
            return multiWorkUnitWeightedQueue.getQueueAsList();
        }
        addNewWorkUnits(multiWorkUnitWeightedQueue);
        return multiWorkUnitWeightedQueue.getQueueAsList();
    }

    private void addFailedWorkUnits(List<WorkUnit> list, MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue) {
        for (WorkUnit workUnit : list) {
            try {
                multiWorkUnitWeightedQueue.addWorkUnit(workUnit, this.fs.getFileStatus(new Path(workUnit.getProp("source.filebased.files.to.pull"))).getLen());
            } catch (IOException e) {
                Throwables.propagate(e);
            }
            LOG.info("Will process file from previous workunit: " + workUnit.getProp("source.filebased.files.to.pull"));
            this.fileCount++;
        }
    }

    private void addNewWorkUnits(MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue) {
        DateTime dateTime = new DateTime();
        DateTime dateTime2 = new DateTime(this.lowWaterMark);
        String name = this.sourceDir.getName();
        DateTime dateTime3 = dateTime2;
        while (true) {
            DateTime dateTime4 = dateTime3;
            if (dateTime4.isAfter(dateTime) || this.fileCount >= this.maxFilesPerJob) {
                break;
            }
            Path path = new Path(this.sourceDir, "daily/" + DAILY_FOLDER_FORMATTER.print(dateTime4));
            try {
                if (this.fs.exists(path)) {
                    SourceState sourceState = new SourceState();
                    sourceState.addAll(this.sourceState);
                    sourceState.setProp("source.entity", name);
                    Extract createExtract = sourceState.createExtract(this.tableType, sourceState.getProp("extract.namespace"), name);
                    LOG.info("Created extract: " + createExtract.getExtractId() + " for path " + path);
                    for (FileStatus fileStatus : this.fs.listStatus(path, getFileFilter())) {
                        LOG.info("Will process file " + fileStatus.getPath());
                        sourceState.setProp("source.filebased.files.to.pull", fileStatus.getPath());
                        sourceState.setProp("workunit.low.water.mark", Long.valueOf(dateTime4.getMillis()));
                        sourceState.setProp("workunit.high.water.mark", Long.valueOf(dateTime4.getMillis()));
                        multiWorkUnitWeightedQueue.addWorkUnit(sourceState.createWorkUnit(createExtract), fileStatus.getLen());
                        this.fileCount++;
                    }
                } else {
                    LOG.info("Path " + path + " does not exist, skipping");
                }
            } catch (IOException e) {
                Throwables.propagate(e);
            }
            dateTime3 = dateTime4.plusDays(1);
        }
        LOG.info("Total number of files extracted for the current run: " + this.fileCount);
    }

    private long getLowWaterMark(Iterable<WorkUnitState> iterable, String str) {
        long parseMillis = DAILY_FOLDER_FORMATTER.parseMillis(str);
        for (WorkUnitState workUnitState : iterable) {
            if (workUnitState.getWorkingState().equals(WorkUnitState.WorkingState.COMMITTED)) {
                long highWaterMark = workUnitState.getWorkunit().getHighWaterMark();
                if (highWaterMark > parseMillis) {
                    parseMillis = highWaterMark;
                }
            }
        }
        return new DateTime(parseMillis).plusDays(1).getMillis();
    }

    private PathFilter getFileFilter() {
        return new PathFilter() { // from class: gobblin.source.DatePartitionedDailyAvroSource.1
            public boolean accept(Path path) {
                return path.getName().endsWith(DatePartitionedDailyAvroSource.AVRO_SUFFIX);
            }
        };
    }
}
