package org.apache.gobblin.source.extractor.filebased;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.json.JsonSchema;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/gobblin/source/extractor/filebased/FileBasedSource.class */
public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
    private static final Logger log = LoggerFactory.getLogger(FileBasedSource.class);
    protected TimestampAwareFileBasedHelper fsHelper;
    protected String splitPattern = ":::";
    protected Optional<LineageInfo> lineageInfo;

    protected void initLogger(SourceState sourceState) {
        MDC.put("sourceInfo", "[" + Strings.nullToEmpty(sourceState.getProp("source.entity")) + "]");
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        initLogger(sourceState);
        this.lineageInfo = LineageInfo.getLineageInfo(sourceState.getBroker());
        try {
            initFileSystemHelper(sourceState);
        } catch (FileBasedHelperException e) {
            Throwables.propagate(e);
        }
        log.info("Getting work units");
        String prop = sourceState.getProp("extract.namespace");
        String prop2 = sourceState.getProp("source.entity");
        String prop3 = sourceState.getProp("extract.table.name");
        if (Strings.isNullOrEmpty(prop3)) {
            prop3 = prop2;
        }
        Extract.TableType valueOf = Extract.TableType.valueOf(sourceState.getProp("extract.table.type").toUpperCase());
        ArrayList newArrayList = Lists.newArrayList(sourceState.getPreviousWorkUnitStates());
        HashSet newHashSet = Sets.newHashSet();
        if (!newArrayList.isEmpty()) {
            if (((WorkUnitState) newArrayList.get(0)).getWorkunit().contains("source.filebased.fs.snapshot")) {
                newHashSet.addAll(((WorkUnitState) newArrayList.get(0)).getWorkunit().getPropAsSet("source.filebased.fs.snapshot"));
            } else if (sourceState.getPropAsBoolean("source.filebased.fs.prior.snapshot.required", false)) {
                throw new RuntimeException(String.format("No '%s' found on state of prior job", "source.filebased.fs.snapshot"));
            }
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        List<WorkUnit> previousWorkUnitsForRetry = getPreviousWorkUnitsForRetry(sourceState);
        log.info("Total number of work units from the previous failed runs: " + previousWorkUnitsForRetry.size());
        for (WorkUnit workUnit : previousWorkUnitsForRetry) {
            newHashSet.addAll(workUnit.getPropAsSet("source.filebased.files.to.pull"));
            newArrayList2.add(workUnit);
        }
        List<String> list = getcurrentFsSnapshot(sourceState);
        ArrayList newArrayList3 = Lists.newArrayList();
        ArrayList newArrayList4 = Lists.newArrayList();
        int propAsInt = sourceState.getPropAsInt("source.filebased.maxFilesPerRun", Integer.MAX_VALUE);
        int i = 0;
        if (list.size() > propAsInt) {
            Collections.sort(list);
        }
        for (String str : list) {
            if (newHashSet.contains(str)) {
                newArrayList3.add(str);
            } else {
                int i2 = i;
                i++;
                if (i2 < propAsInt) {
                    newArrayList4.add(str.split(this.splitPattern)[0]);
                    newArrayList3.add(str);
                }
            }
        }
        Iterator it = previousWorkUnitsForRetry.iterator();
        while (it.hasNext()) {
            ((WorkUnit) it.next()).setProp("source.filebased.fs.snapshot", StringUtils.join(newArrayList3, ","));
        }
        if (!newArrayList4.isEmpty()) {
            logFilesToPull(newArrayList4);
            int size = (!sourceState.contains("source.max.number.of.partitions") || sourceState.getPropAsInt("source.max.number.of.partitions") > newArrayList4.size()) ? newArrayList4.size() : sourceState.getPropAsInt("source.max.number.of.partitions");
            if (size <= 0) {
                throw new IllegalArgumentException("The number of partitions should be positive");
            }
            int size2 = newArrayList4.size() % size == 0 ? newArrayList4.size() / size : (newArrayList4.size() / size) + 1;
            int i3 = 0;
            while (true) {
                int i4 = i3;
                if (i4 >= newArrayList4.size()) {
                    log.info("Total number of work units for the current run: " + (newArrayList2.size() - previousWorkUnitsForRetry.size()));
                    break;
                }
                SourceState sourceState2 = new SourceState();
                sourceState2.setProp("extract.extractIdTimeZone", sourceState.getProp("extract.extractIdTimeZone", "UTC"));
                sourceState2.setProp("extract.is.full", sourceState.getProp("extract.is.full", "false"));
                WorkUnit create = WorkUnit.create(new Extract(sourceState2, valueOf, prop, prop3));
                create.setProp("source.filebased.fs.snapshot", StringUtils.join(newArrayList3, ","));
                List<String> subList = newArrayList4.subList(i4, i4 + size2 > newArrayList4.size() ? newArrayList4.size() : i4 + size2);
                create.setProp("source.filebased.files.to.pull", StringUtils.join(subList, ","));
                if (sourceState.getPropAsBoolean("source.filebased.preserve.file.name", false)) {
                    if (subList.size() != 1) {
                        throw new RuntimeException("Cannot preserve the file name if a workunit is given multiple files");
                    }
                    create.setProp("data.publisher.final.dir", create.getProp("source.filebased.files.to.pull"));
                }
                newArrayList2.add(create);
                i3 = i4 + size2;
            }
        }
        addLineageSourceInfo((List<WorkUnit>) newArrayList2, (State) sourceState);
        return newArrayList2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addLineageSourceInfo(List<WorkUnit> list, State state) {
        list.forEach(workUnit -> {
            if (workUnit instanceof MultiWorkUnit) {
                ((MultiWorkUnit) workUnit).getWorkUnits().forEach(workUnit -> {
                    addLineageSourceInfo(workUnit, state);
                });
            } else {
                addLineageSourceInfo(workUnit, state);
            }
        });
    }

    protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
        if (!this.lineageInfo.isPresent()) {
            log.info("Lineage is not enabled");
            return;
        }
        ((LineageInfo) this.lineageInfo.get()).setSource(new DatasetDescriptor(state.getProp("source.filebased.platform", "hdfs"), URI.create(state.getProp("source.filebased.fs.uri", "file:///")), Path.getPathWithoutSchemeAndAuthority(new Path(state.getProp("source.filebased.data.directory"))).toString()), workUnit);
    }

    public List<String> getcurrentFsSnapshot(State state) {
        String lsPattern = getLsPattern(state);
        try {
            log.info("Running ls command with input " + lsPattern);
            List<String> ls = this.fsHelper.ls(lsPattern);
            for (int i = 0; i < ls.size(); i++) {
                URI uri = new URI(ls.get(i));
                String uri2 = uri.toString();
                if (!uri.isAbsolute()) {
                    uri2 = new File(state.getProp("source.filebased.data.directory"), uri.toString()).getAbsolutePath();
                }
                ls.set(i, uri2 + this.splitPattern + this.fsHelper.getFileMTime(uri2));
            }
            return ls;
        } catch (URISyntaxException | FileBasedHelperException e) {
            String format = String.format("Not able to fetch the filename/file modified time to %s. Will not pull any files", e.getMessage());
            log.error(format, e);
            throw new RuntimeException(format, e);
        }
    }

    protected String getLsPattern(State state) {
        return state.getProp("source.filebased.data.directory") + "/*" + state.getProp("source.entity") + "*";
    }

    public void shutdown(SourceState sourceState) {
        if (this.fsHelper != null) {
            log.info("Shutting down the FileSystemHelper connection");
            try {
                this.fsHelper.close();
            } catch (IOException e) {
                log.error("Unable to shutdown FileSystemHelper", e);
            }
        }
    }

    public abstract void initFileSystemHelper(State state) throws FileBasedHelperException;

    private void logFilesToPull(List<String> list) {
        int min = Math.min(2000, list.size());
        String str = JsonSchema.DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY;
        if (min < list.size()) {
            str = "and " + (list.size() - min) + " more ";
        }
        log.info(String.format("Will pull the following files %s in this run: %s", str, Arrays.toString(list.subList(0, min).toArray())));
    }
}
