package gobblin.source;

import com.google.common.base.Throwables;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.SourceState;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.source.PartitionAwareFileRetriever;
import gobblin.source.extractor.Extractor;
import gobblin.source.extractor.filebased.FileBasedHelperException;
import gobblin.source.extractor.filebased.FileBasedSource;
import gobblin.source.extractor.hadoop.AvroFsHelper;
import gobblin.source.workunit.Extract;
import gobblin.source.workunit.MultiWorkUnitWeightedQueue;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.DatePartitionType;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-core-0.11.0.jar:gobblin/source/PartitionedFileSourceBase.class */
public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedSource<SCHEMA, DATA> {
    public static final String DATE_PARTITIONED_SOURCE_PREFIX = "date.partitioned.source";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_PREFIX = "date.partitioned.source.partition.prefix";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX = "date.partitioned.source.partition.suffix";
    static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN = "date.partitioned.source.partition.pattern";
    public static final String DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY = "date.partitioned.source.partition.granularity";
    private static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = "date.partitioned.source.min.watermark.value";
    private static final String DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB = "date.partitioned.source.max.files.per.job";
    private static final String DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = "date.partitioned.source.max.workunits.per.job";
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB = 2000;
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = 500;
    private static final int DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE = 0;
    private SourceState sourceState;
    private FileSystem fs;
    private long lowWaterMark;
    private int maxFilesPerJob;
    private int maxWorkUnitsPerJob;
    private int fileCount;
    private Extract.TableType tableType;
    private Path sourceDir;
    private final PartitionAwareFileRetriever retriever;
    public static final DatePartitionType DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY = DatePartitionType.HOUR;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PartitionedFileSourceBase.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionedFileSourceBase(PartitionAwareFileRetriever partitionAwareFileRetriever) {
        this.retriever = partitionAwareFileRetriever;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init(SourceState sourceState) {
        this.retriever.init(sourceState);
        try {
            initFileSystemHelper(sourceState);
        } catch (FileBasedHelperException e) {
            Throwables.propagate(e);
        }
        this.fs = ((AvroFsHelper) this.fsHelper).getFileSystem();
        this.sourceState = sourceState;
        this.lowWaterMark = getLowWaterMark(sourceState.getPreviousWorkUnitStates(), sourceState.getProp(DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, String.valueOf(0)));
        this.maxFilesPerJob = sourceState.getPropAsInt(DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB, 2000);
        this.maxWorkUnitsPerJob = sourceState.getPropAsInt(DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB, 500);
        this.tableType = Extract.TableType.valueOf(sourceState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
        this.fileCount = 0;
        this.sourceDir = new Path(sourceState.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
    }

    @Override // gobblin.source.extractor.filebased.FileBasedSource
    public void initFileSystemHelper(State state) throws FileBasedHelperException {
        this.fsHelper = new AvroFsHelper(state);
        this.fsHelper.connect();
    }

    @Override // gobblin.source.Source
    public abstract Extractor<SCHEMA, DATA> getExtractor(WorkUnitState workUnitState) throws IOException;

    @Override // gobblin.source.extractor.filebased.FileBasedSource, gobblin.source.Source
    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        DateTimeFormatter fullDateTime = DateTimeFormat.fullDateTime();
        init(sourceState);
        LOG.info("Will pull data from " + fullDateTime.print(this.lowWaterMark) + " until " + this.maxFilesPerJob + " files have been processed, or until there is no more data to consume");
        LOG.info("Creating workunits");
        MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue = new MultiWorkUnitWeightedQueue(this.maxWorkUnitsPerJob);
        addFailedWorkUnits(getPreviousWorkUnitsForRetry(this.sourceState), multiWorkUnitWeightedQueue);
        if (this.fileCount >= this.maxFilesPerJob) {
            LOG.info("The number of work units from previous job has already reached the upper limit, no more workunits will be made");
            return multiWorkUnitWeightedQueue.getQueueAsList();
        }
        addNewWorkUnits(multiWorkUnitWeightedQueue);
        return multiWorkUnitWeightedQueue.getQueueAsList();
    }

    private void addFailedWorkUnits(List<WorkUnit> list, MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue) {
        for (WorkUnit workUnit : list) {
            try {
                multiWorkUnitWeightedQueue.addWorkUnit(workUnit, this.fs.getFileStatus(new Path(workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL))).getLen());
            } catch (IOException e) {
                Throwables.propagate(e);
            }
            LOG.info("Will process file from previous workunit: " + workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
            this.fileCount++;
        }
    }

    private Extract getExtractForFile(PartitionAwareFileRetriever.FileInfo fileInfo, String str, SourceState sourceState, Map<Long, Extract> map) {
        Extract extract = map.get(Long.valueOf(fileInfo.getWatermarkMsSinceEpoch()));
        if (extract == null) {
            extract = sourceState.createExtract(this.tableType, sourceState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY), str);
            LOG.info("Created extract: " + extract.getExtractId() + " for path " + str);
            map.put(Long.valueOf(fileInfo.getWatermarkMsSinceEpoch()), extract);
        }
        return extract;
    }

    private void addNewWorkUnits(MultiWorkUnitWeightedQueue multiWorkUnitWeightedQueue) {
        try {
            List<PartitionAwareFileRetriever.FileInfo> filesToProcess = this.retriever.getFilesToProcess(this.lowWaterMark, this.maxFilesPerJob - this.fileCount);
            Collections.sort(filesToProcess);
            String name = this.sourceDir.getName();
            SourceState sourceState = new SourceState();
            sourceState.addAll(this.sourceState);
            sourceState.setProp(ConfigurationKeys.SOURCE_ENTITY, name);
            HashMap hashMap = new HashMap();
            for (PartitionAwareFileRetriever.FileInfo fileInfo : filesToProcess) {
                Extract extractForFile = getExtractForFile(fileInfo, name, sourceState, hashMap);
                LOG.info("Will process file " + fileInfo.getFilePath());
                sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, fileInfo.getFilePath());
                sourceState.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, Long.valueOf(fileInfo.getWatermarkMsSinceEpoch()));
                sourceState.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, Long.valueOf(fileInfo.getWatermarkMsSinceEpoch()));
                sourceState.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, Long.valueOf(fileInfo.getWatermarkMsSinceEpoch()));
                multiWorkUnitWeightedQueue.addWorkUnit(sourceState.createWorkUnit(extractForFile), fileInfo.getFileSize());
                this.fileCount++;
            }
            LOG.info("Total number of files extracted for the current run: " + filesToProcess.size());
        } catch (IOException e) {
            Throwables.propagate(e);
        }
    }

    private long getLowWaterMark(Iterable<WorkUnitState> iterable, String str) {
        long watermarkFromString = this.retriever.getWatermarkFromString(str);
        for (WorkUnitState workUnitState : iterable) {
            if (workUnitState.getWorkingState().equals(WorkUnitState.WorkingState.COMMITTED)) {
                long highWaterMark = workUnitState.getWorkunit().getHighWaterMark();
                if (highWaterMark > watermarkFromString) {
                    watermarkFromString = highWaterMark;
                }
            }
        }
        return watermarkFromString + getRetriever().getWatermarkIncrementMs();
    }

    protected PartitionAwareFileRetriever getRetriever() {
        return this.retriever;
    }
}
