package co.cask.cdap.metadata;

import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.data2.metadata.lineage.Lineage;
import co.cask.cdap.data2.metadata.lineage.LineageStore;
import co.cask.cdap.data2.metadata.lineage.Relation;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.metadata.MetadataRecord;
import co.cask.cdap.proto.metadata.MetadataScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.twill.api.RunId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metadata/LineageAdmin.class */
public class LineageAdmin {
    private static final Logger LOG = LoggerFactory.getLogger(LineageAdmin.class);
    private static final Function<Relation, Id.Program> RELATION_TO_PROGRAM_FUNCTION = new Function<Relation, Id.Program>() { // from class: co.cask.cdap.metadata.LineageAdmin.1
        public Id.Program apply(Relation relation) {
            return relation.getProgram();
        }
    };
    private static final Function<Relation, Id.NamespacedId> RELATION_TO_DATA_FUNCTION = new Function<Relation, Id.NamespacedId>() { // from class: co.cask.cdap.metadata.LineageAdmin.2
        public Id.NamespacedId apply(Relation relation) {
            return relation.getData();
        }
    };
    private final LineageStore lineageStore;
    private final Store store;
    private final MetadataStore metadataStore;
    private final EntityValidator entityValidator;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:co/cask/cdap/metadata/LineageAdmin$ScanRangeWithFilter.class */
    public static class ScanRangeWithFilter {
        private final long start;
        private final long end;
        private final Predicate<Relation> filter;

        public ScanRangeWithFilter(long j, long j2, Predicate<Relation> predicate) {
            this.start = j;
            this.end = j2;
            this.filter = predicate;
        }

        public long getStart() {
            return this.start;
        }

        public long getEnd() {
            return this.end;
        }

        public Predicate<Relation> getFilter() {
            return this.filter;
        }
    }

    @Inject
    LineageAdmin(LineageStore lineageStore, Store store, MetadataStore metadataStore, EntityValidator entityValidator) {
        this.lineageStore = lineageStore;
        this.store = store;
        this.metadataStore = metadataStore;
        this.entityValidator = entityValidator;
    }

    public Lineage computeLineage(Id.DatasetInstance datasetInstance, long j, long j2, int i) throws NotFoundException {
        return doComputeLineage(datasetInstance, j, j2, i);
    }

    public Lineage computeLineage(Id.Stream stream, long j, long j2, int i) throws NotFoundException {
        return doComputeLineage(stream, j, j2, i);
    }

    public Set<MetadataRecord> getMetadataForRun(Id.Run run) throws NotFoundException {
        this.entityValidator.ensureRunExists(run);
        HashSet hashSet = new HashSet(this.lineageStore.getEntitiesForRun(run));
        if (hashSet.isEmpty()) {
            return ImmutableSet.of();
        }
        RunId fromString = RunIds.fromString(run.getId());
        hashSet.add(run.getProgram().getApplication());
        return this.metadataStore.getSnapshotBeforeTime(MetadataScope.USER, hashSet, RunIds.getTime(fromString, TimeUnit.MILLISECONDS));
    }

    Lineage doComputeLineage(Id.NamespacedId namespacedId, long j, long j2, int i) throws NotFoundException {
        LOG.trace("Computing lineage for data {}, startMillis {}, endMillis {}, levels {}", new Object[]{namespacedId, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
        this.entityValidator.ensureEntityExists(namespacedId);
        Set<RunId> runningInRange = this.store.getRunningInRange(TimeUnit.MILLISECONDS.toSeconds(j), TimeUnit.MILLISECONDS.toSeconds(j2));
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got {} rundIds in time range ({}, {})", new Object[]{Integer.valueOf(runningInRange.size()), Long.valueOf(j), Long.valueOf(j2)});
        }
        ScanRangeWithFilter scanRange = getScanRange(runningInRange);
        LOG.trace("Using scan start = {}, scan end = {}", Long.valueOf(scanRange.getStart()), Long.valueOf(scanRange.getEnd()));
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet<Id.NamespacedId> hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        HashSet<Id.Program> hashSet5 = new HashSet();
        hashSet3.add(namespacedId);
        for (int i2 = 0; i2 < i; i2++) {
            LOG.trace("Level {}", Integer.valueOf(i2));
            hashSet5.clear();
            for (Id.NamespacedId namespacedId2 : hashSet3) {
                if (!hashSet2.contains(namespacedId2)) {
                    LOG.trace("Visiting dataset {}", namespacedId2);
                    hashSet2.add(namespacedId2);
                    Iterable<Relation> programRelations = getProgramRelations(namespacedId2, scanRange.getStart(), scanRange.getEnd(), scanRange.getFilter());
                    LOG.trace("Got program relations {}", programRelations);
                    Iterables.addAll(hashSet, programRelations);
                    Iterables.addAll(hashSet5, Iterables.transform(programRelations, RELATION_TO_PROGRAM_FUNCTION));
                }
            }
            hashSet3.clear();
            for (Id.Program program : hashSet5) {
                if (!hashSet4.contains(program)) {
                    LOG.trace("Visiting program {}", program);
                    hashSet4.add(program);
                    Set relations = this.lineageStore.getRelations(program, scanRange.getStart(), scanRange.getEnd(), scanRange.getFilter());
                    LOG.trace("Got data relations {}", relations);
                    Iterables.addAll(hashSet, relations);
                    Iterables.addAll(hashSet3, Iterables.transform(relations, RELATION_TO_DATA_FUNCTION));
                }
            }
        }
        Lineage lineage = new Lineage(hashSet);
        LOG.trace("Got lineage {}", lineage);
        return lineage;
    }

    private Iterable<Relation> getProgramRelations(Id.NamespacedId namespacedId, long j, long j2, Predicate<Relation> predicate) {
        if (namespacedId instanceof Id.DatasetInstance) {
            return this.lineageStore.getRelations((Id.DatasetInstance) namespacedId, j, j2, predicate);
        }
        if (namespacedId instanceof Id.Stream) {
            return this.lineageStore.getRelations((Id.Stream) namespacedId, j, j2, predicate);
        }
        throw new IllegalStateException("Unknown data type " + namespacedId);
    }

    @VisibleForTesting
    static ScanRangeWithFilter getScanRange(final Set<RunId> set) {
        if (set.isEmpty()) {
            return new ScanRangeWithFilter(0L, 0L, Predicates.alwaysFalse());
        }
        long j = Long.MAX_VALUE;
        long j2 = 0;
        Iterator<RunId> it = set.iterator();
        while (it.hasNext()) {
            long time = RunIds.getTime(it.next(), TimeUnit.MILLISECONDS);
            if (time < j) {
                j = time;
            }
            if (time > j2) {
                j2 = time;
            }
        }
        return new ScanRangeWithFilter(j, j2 + 1, new Predicate<Relation>() { // from class: co.cask.cdap.metadata.LineageAdmin.3
            public boolean apply(Relation relation) {
                return set.contains(relation.getRun());
            }
        });
    }
}
