package org.apache.gobblin.data.management.source;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.PartitionableDataset;
import org.apache.gobblin.dataset.URNIdentified;
import org.apache.gobblin.dataset.comparators.URNLexicographicalComparator;
import org.apache.gobblin.runtime.task.NoopTask;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.class */
public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSource<S, D> {
    private static final Logger log = LoggerFactory.getLogger(LoopingDatasetFinderSource.class);
    public static final String MAX_WORK_UNITS_PER_RUN_KEY = "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun";
    public static final int MAX_WORK_UNITS_PER_RUN = 10;
    public static final String DATASET_PARTITION_DELIMITER = "@";
    protected static final String DATASET_URN = "gobblin.source.loopingDatasetFinderSource.datasetUrn";
    protected static final String PARTITION_URN = "gobblin.source.loopingDatasetFinderSource.partitionUrn";
    protected static final String END_OF_DATASETS_KEY = "gobblin.source.loopingDatasetFinderSource.endOfDatasets";
    protected static final String GLOBAL_WATERMARK_DATASET_KEY = "gobblin.source.loopingDatasetFinderSource.globalWatermarkDataset";
    private final URNLexicographicalComparator lexicographicalComparator;
    protected boolean isDatasetStateStoreEnabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/gobblin/data/management/source/LoopingDatasetFinderSource$DeepIterator.class */
    public class DeepIterator extends AbstractIterator<WorkUnit> {
        protected final Iterator<Dataset> baseIterator;
        protected final int maxWorkUnits;
        protected int generatedWorkUnits = 0;
        protected Dataset previousDataset;
        private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator;
        private PartitionableDataset.DatasetPartition previousPartition;

        public DeepIterator(Iterator<Dataset> it, String str, String str2, int i) throws IOException {
            this.maxWorkUnits = i;
            this.baseIterator = it;
            Dataset advanceUntilLargerThan = advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), str);
            if (!LoopingDatasetFinderSource.this.drilldownIntoPartitions || advanceUntilLargerThan == null || !(advanceUntilLargerThan instanceof PartitionableDataset)) {
                this.currentPartitionIterator = Iterators.emptyIterator();
            } else {
                this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) advanceUntilLargerThan);
                advanceUntilLargerThan(Iterators.peekingIterator(this.currentPartitionIterator), str2);
            }
        }

        @Nullable
        private <T extends URNIdentified> T advanceUntilLargerThan(PeekingIterator<T> peekingIterator, String str) {
            if (str == null) {
                return null;
            }
            int i = -1;
            while (peekingIterator.hasNext()) {
                int compare = LoopingDatasetFinderSource.this.lexicographicalComparator.compare((URNIdentified) peekingIterator.peek(), str);
                i = compare;
                if (compare >= 0) {
                    break;
                }
                peekingIterator.next();
            }
            if (i == 0) {
                return (T) peekingIterator.next();
            }
            return null;
        }

        private Iterator<PartitionableDataset.DatasetPartition> getPartitionIterator(PartitionableDataset partitionableDataset) {
            try {
                Iterator<PartitionableDataset.DatasetPartition> it = LoopingDatasetFinderSource.this.sortStreamLexicographically(partitionableDataset.getPartitions(4, LoopingDatasetFinderSource.this.lexicographicalComparator)).iterator();
                this.currentPartitionIterator = it;
                return it;
            } catch (IOException e) {
                LoopingDatasetFinderSource.log.error("Failed to get partitions for dataset " + partitionableDataset.getUrn());
                return Iterators.emptyIterator();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public WorkUnit m116computeNext() {
            if (this.generatedWorkUnits == this.maxWorkUnits) {
                this.generatedWorkUnits++;
                return generateNoopWorkUnit();
            }
            if (this.generatedWorkUnits > this.maxWorkUnits) {
                return (WorkUnit) endOfData();
            }
            WorkUnit doComputeNext = doComputeNext();
            if (doComputeNext == null) {
                doComputeNext = generateNoopWorkUnit();
                this.generatedWorkUnits = Integer.MAX_VALUE;
                doComputeNext.setProp(LoopingDatasetFinderSource.END_OF_DATASETS_KEY, true);
            }
            return doComputeNext;
        }

        protected WorkUnit doComputeNext() {
            while (true) {
                if (!this.baseIterator.hasNext() && !this.currentPartitionIterator.hasNext()) {
                    return null;
                }
                if (this.currentPartitionIterator == null || !this.currentPartitionIterator.hasNext()) {
                    Dataset next = this.baseIterator.next();
                    if (LoopingDatasetFinderSource.this.drilldownIntoPartitions && (next instanceof PartitionableDataset)) {
                        this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) next);
                    } else {
                        WorkUnit workUnitForDataset = LoopingDatasetFinderSource.this.workUnitForDataset(next);
                        if (workUnitForDataset != null) {
                            addDatasetInfoToWorkUnit(workUnitForDataset, next);
                            this.previousDataset = next;
                            this.generatedWorkUnits++;
                            return workUnitForDataset;
                        }
                    }
                } else {
                    PartitionableDataset.DatasetPartition next2 = this.currentPartitionIterator.next();
                    WorkUnit workUnitForDatasetPartition = LoopingDatasetFinderSource.this.workUnitForDatasetPartition(next2);
                    if (workUnitForDatasetPartition != null) {
                        addDatasetInfoToWorkUnit(workUnitForDatasetPartition, next2.getDataset());
                        addPartitionInfoToWorkUnit(workUnitForDatasetPartition, next2);
                        this.previousDataset = next2.getDataset();
                        this.previousPartition = next2;
                        this.generatedWorkUnits++;
                        return workUnitForDatasetPartition;
                    }
                }
            }
        }

        protected void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
            if (LoopingDatasetFinderSource.this.isDatasetStateStoreEnabled) {
                workUnit.setProp("dataset.urn", dataset.getUrn());
            }
        }

        private void addPartitionInfoToWorkUnit(WorkUnit workUnit, PartitionableDataset.DatasetPartition datasetPartition) {
            if (LoopingDatasetFinderSource.this.isDatasetStateStoreEnabled) {
                workUnit.setProp("dataset.urn", Joiner.on(LoopingDatasetFinderSource.DATASET_PARTITION_DELIMITER).join(datasetPartition.getDataset().getUrn(), datasetPartition.getUrn(), new Object[0]));
            }
        }

        private WorkUnit generateNoopWorkUnit() {
            WorkUnit noopWorkunit = NoopTask.noopWorkunit();
            noopWorkunit.setProp(LoopingDatasetFinderSource.GLOBAL_WATERMARK_DATASET_KEY, true);
            if (this.previousDataset != null) {
                noopWorkunit.setProp(LoopingDatasetFinderSource.DATASET_URN, this.previousDataset.getUrn());
            }
            if (LoopingDatasetFinderSource.this.drilldownIntoPartitions && this.previousPartition != null) {
                noopWorkunit.setProp(LoopingDatasetFinderSource.PARTITION_URN, this.previousPartition.getUrn());
            }
            if (LoopingDatasetFinderSource.this.isDatasetStateStoreEnabled) {
                noopWorkunit.setProp("dataset.urn", "__globalDatasetWatermark");
            }
            return noopWorkunit;
        }
    }

    public LoopingDatasetFinderSource(boolean z) {
        super(z);
        this.lexicographicalComparator = new URNLexicographicalComparator();
    }

    @Override // org.apache.gobblin.data.management.source.DatasetFinderSource
    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        return Lists.newArrayList(getWorkunitStream(sourceState).getMaterializedWorkUnitCollection());
    }

    @Override // org.apache.gobblin.data.management.source.DatasetFinderSource
    public WorkUnitStream getWorkunitStream(SourceState sourceState) {
        return getWorkunitStream(sourceState, false);
    }

    public WorkUnitStream getWorkunitStream(SourceState sourceState, boolean z) {
        this.isDatasetStateStoreEnabled = z;
        try {
            int propAsInt = sourceState.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, 10);
            Preconditions.checkArgument(propAsInt > 0, "Max work units must be greater than 0!");
            List previousWorkUnitStates = this.isDatasetStateStoreEnabled ? sourceState.getPreviousWorkUnitStates("__globalDatasetWatermark") : sourceState.getPreviousWorkUnitStates();
            Optional empty = Optional.empty();
            Iterator it = previousWorkUnitStates.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                WorkUnitState workUnitState = (WorkUnitState) it.next();
                if (workUnitState.getPropAsBoolean(GLOBAL_WATERMARK_DATASET_KEY, false)) {
                    empty = Optional.of(workUnitState);
                    break;
                }
            }
            Stream<T> sortStreamLexicographically = sortStreamLexicographically(createDatasetsFinder(sourceState).getDatasetsStream(4, this.lexicographicalComparator));
            String str = null;
            String str2 = null;
            if (empty.isPresent() && !((WorkUnitState) empty.get()).getPropAsBoolean(END_OF_DATASETS_KEY, false)) {
                str = ((WorkUnitState) empty.get()).getProp(DATASET_URN);
                str2 = ((WorkUnitState) empty.get()).getProp(PARTITION_URN);
            }
            return new BasicWorkUnitStream.Builder(getWorkUnitIterator(sortStreamLexicographically.iterator(), str, str2, propAsInt)).setFiniteStream(true).build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Iterator<WorkUnit> getWorkUnitIterator(Iterator<Dataset> it, String str, @Nullable String str2, int i) throws IOException {
        return new DeepIterator(it, str, str2, i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends URNIdentified> Stream<T> sortStreamLexicographically(Stream<T> stream) {
        Spliterator<T> spliterator = stream.spliterator();
        return (spliterator.hasCharacteristics(4) && spliterator.getComparator().equals(this.lexicographicalComparator)) ? StreamSupport.stream(spliterator, false) : StreamSupport.stream(spliterator, false).sorted(this.lexicographicalComparator);
    }
}
