package co.cask.cdap.metadata;

import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.entity.EntityExistenceVerifier;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.Lineage;
import co.cask.cdap.data2.metadata.lineage.LineageStoreReader;
import co.cask.cdap.data2.metadata.lineage.Relation;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.metadata.MetadataRecord;
import co.cask.cdap.proto.metadata.MetadataScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.inject.Inject;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metadata/LineageAdmin.class */
public class LineageAdmin {
    private static final Logger LOG = LoggerFactory.getLogger(LineageAdmin.class);
    private static final Function<Relation, ProgramId> RELATION_TO_PROGRAM_FUNCTION = new Function<Relation, ProgramId>() { // from class: co.cask.cdap.metadata.LineageAdmin.1
        public ProgramId apply(Relation relation) {
            return relation.getProgram();
        }
    };
    private static final Function<Relation, NamespacedEntityId> RELATION_TO_DATA_FUNCTION = new Function<Relation, NamespacedEntityId>() { // from class: co.cask.cdap.metadata.LineageAdmin.2
        public NamespacedEntityId apply(Relation relation) {
            return relation.getData();
        }
    };
    private static final Predicate<Relation> UNKNOWN_TYPE_FILTER = new Predicate<Relation>() { // from class: co.cask.cdap.metadata.LineageAdmin.3
        public boolean apply(Relation relation) {
            return relation.getAccess() != AccessType.UNKNOWN;
        }
    };
    private static final Function<Collection<Relation>, Collection<Relation>> COLLAPSE_UNKNOWN_TYPE_FUNCTION = new Function<Collection<Relation>, Collection<Relation>>() { // from class: co.cask.cdap.metadata.LineageAdmin.4
        public Collection<Relation> apply(Collection<Relation> collection) {
            return collection.size() <= 1 ? collection : Collections2.filter(collection, LineageAdmin.UNKNOWN_TYPE_FILTER);
        }
    };
    private final LineageStoreReader lineageStoreReader;
    private final Store store;
    private final MetadataStore metadataStore;
    private final EntityExistenceVerifier entityExistenceVerifier;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/metadata/LineageAdmin$RelationKey.class */
    public static final class RelationKey {
        private final Relation relation;
        private final int hashCode;

        private RelationKey(Relation relation) {
            this.relation = relation;
            this.hashCode = Objects.hash(relation.getData(), relation.getProgram(), relation.getRun(), relation.getComponents());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            RelationKey relationKey = (RelationKey) obj;
            return Objects.equals(this.relation.getData(), relationKey.relation.getData()) && Objects.equals(this.relation.getProgram(), relationKey.relation.getProgram()) && Objects.equals(this.relation.getRun(), relationKey.relation.getRun()) && Objects.equals(this.relation.getComponents(), relationKey.relation.getComponents());
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:co/cask/cdap/metadata/LineageAdmin$ScanRangeWithFilter.class */
    public static class ScanRangeWithFilter {
        private final long start;
        private final long end;
        private final Predicate<Relation> filter;

        ScanRangeWithFilter(long j, long j2, Predicate<Relation> predicate) {
            this.start = j;
            this.end = j2;
            this.filter = predicate;
        }

        public long getStart() {
            return this.start;
        }

        public long getEnd() {
            return this.end;
        }

        public Predicate<Relation> getFilter() {
            return this.filter;
        }
    }

    @Inject
    LineageAdmin(LineageStoreReader lineageStoreReader, Store store, MetadataStore metadataStore, EntityExistenceVerifier entityExistenceVerifier) {
        this.lineageStoreReader = lineageStoreReader;
        this.store = store;
        this.metadataStore = metadataStore;
        this.entityExistenceVerifier = entityExistenceVerifier;
    }

    public Lineage computeLineage(DatasetId datasetId, long j, long j2, int i, String str) throws NotFoundException {
        return doComputeLineage(datasetId, j, j2, i, str);
    }

    public Lineage computeLineage(DatasetId datasetId, long j, long j2, int i) throws NotFoundException {
        return doComputeLineage(datasetId, j, j2, i, null);
    }

    public Lineage computeLineage(StreamId streamId, long j, long j2, int i, String str) throws NotFoundException {
        return doComputeLineage(streamId, j, j2, i, str);
    }

    public Lineage computeLineage(StreamId streamId, long j, long j2, int i) throws NotFoundException {
        return doComputeLineage(streamId, j, j2, i, null);
    }

    public Set<MetadataRecord> getMetadataForRun(ProgramRunId programRunId) throws NotFoundException {
        this.entityExistenceVerifier.ensureExists(programRunId);
        HashSet hashSet = new HashSet(this.lineageStoreReader.getEntitiesForRun(programRunId));
        if (hashSet.isEmpty()) {
            return ImmutableSet.of();
        }
        RunId fromString = RunIds.fromString(programRunId.getRun());
        hashSet.add(programRunId.getParent().getParent());
        return this.metadataStore.getSnapshotBeforeTime(MetadataScope.USER, hashSet, RunIds.getTime(fromString, TimeUnit.MILLISECONDS));
    }

    @Nullable
    private ProgramRunId getWorkflowProgramRunid(Relation relation, Map<ProgramRunId, RunRecordMeta> map, Map<String, ProgramRunId> map2) {
        ProgramRunId programRunId = null;
        RunRecordMeta runRecordMeta = map.get(new ProgramRunId(relation.getProgram().getNamespace(), relation.getProgram().getApplication(), relation.getProgram().getType(), relation.getProgram().getProgram(), relation.getRun().getId()));
        if (runRecordMeta != null && runRecordMeta.getProperties().containsKey("workflowrunid")) {
            programRunId = map2.get((String) runRecordMeta.getProperties().get("workflowrunid"));
        }
        return programRunId;
    }

    private Multimap<RelationKey, Relation> getRollupRelations(Multimap<RelationKey, Relation> multimap, Map<ProgramRunId, RunRecordMeta> map, Map<String, ProgramRunId> map2) throws NotFoundException {
        HashMultimap create = HashMultimap.create();
        for (Map.Entry entry : multimap.asMap().entrySet()) {
            for (Relation relation : (Collection) entry.getValue()) {
                ProgramRunId workflowProgramRunid = getWorkflowProgramRunid(relation, map, map2);
                if (workflowProgramRunid == null) {
                    create.put(entry.getKey(), relation);
                } else {
                    ProgramId programId = new ProgramId(workflowProgramRunid.getNamespace(), workflowProgramRunid.getApplication(), workflowProgramRunid.getType(), workflowProgramRunid.getProgram());
                    DatasetId data = relation.getData();
                    create.put(entry.getKey(), data instanceof DatasetId ? new Relation(data, programId, relation.getAccess(), RunIds.fromString(workflowProgramRunid.getRun())) : new Relation((StreamId) data, programId, relation.getAccess(), RunIds.fromString(workflowProgramRunid.getRun())));
                }
            }
        }
        return create;
    }

    private Set<String> getWorkflowIds(Multimap<RelationKey, Relation> multimap, Map<ProgramRunId, RunRecordMeta> map) throws NotFoundException {
        HashSet hashSet = new HashSet();
        for (Relation relation : Iterables.concat(new Iterable[]{multimap.values()})) {
            RunRecordMeta runRecordMeta = map.get(new ProgramRunId(relation.getProgram().getNamespace(), relation.getProgram().getApplication(), relation.getProgram().getType(), relation.getProgram().getProgram(), relation.getRun().getId()));
            if (runRecordMeta != null && runRecordMeta.getProperties().containsKey("workflowrunid")) {
                hashSet.add((String) runRecordMeta.getProperties().get("workflowrunid"));
            }
        }
        return hashSet;
    }

    private Multimap<RelationKey, Relation> doComputeRollupLineage(Multimap<RelationKey, Relation> multimap) throws NotFoundException {
        HashSet hashSet = new HashSet();
        for (Relation relation : Iterables.concat(new Iterable[]{multimap.values()})) {
            hashSet.add(new ProgramRunId(relation.getProgram().getNamespace(), relation.getProgram().getApplication(), relation.getProgram().getType(), relation.getProgram().getProgram(), relation.getRun().getId()));
        }
        Map<ProgramRunId, RunRecordMeta> runs = this.store.getRuns(hashSet);
        final Set<String> workflowIds = getWorkflowIds(multimap, runs);
        Map<ProgramRunId, RunRecordMeta> runs2 = this.store.getRuns(ProgramRunStatus.ALL, new Predicate<RunRecordMeta>() { // from class: co.cask.cdap.metadata.LineageAdmin.5
            public boolean apply(RunRecordMeta runRecordMeta) {
                return workflowIds.contains(runRecordMeta.getPid());
            }
        });
        Map<String, ProgramRunId> hashMap = new HashMap<>();
        for (Map.Entry<ProgramRunId, RunRecordMeta> entry : runs2.entrySet()) {
            hashMap.put(entry.getValue().getPid(), entry.getKey());
        }
        return getRollupRelations(multimap, runs, hashMap);
    }

    private Lineage doComputeLineage(NamespacedEntityId namespacedEntityId, long j, long j2, int i, @Nullable String str) throws NotFoundException {
        LOG.trace("Computing lineage for data {}, startMillis {}, endMillis {}, levels {}", new Object[]{namespacedEntityId, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
        Set<RunId> runningInRange = this.store.getRunningInRange(TimeUnit.MILLISECONDS.toSeconds(j), TimeUnit.MILLISECONDS.toSeconds(j2));
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got {} rundIds in time range ({}, {})", new Object[]{Integer.valueOf(runningInRange.size()), Long.valueOf(j), Long.valueOf(j2)});
        }
        ScanRangeWithFilter scanRange = getScanRange(runningInRange);
        LOG.trace("Using scan start = {}, scan end = {}", Long.valueOf(scanRange.getStart()), Long.valueOf(scanRange.getEnd()));
        Multimap<RelationKey, Relation> create = HashMultimap.create();
        HashSet hashSet = new HashSet();
        HashSet<NamespacedEntityId> hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        HashSet<ProgramId> hashSet4 = new HashSet();
        hashSet2.add(namespacedEntityId);
        for (int i2 = 0; i2 < i; i2++) {
            LOG.trace("Level {}", Integer.valueOf(i2));
            hashSet4.clear();
            for (NamespacedEntityId namespacedEntityId2 : hashSet2) {
                if (hashSet.add(namespacedEntityId2)) {
                    LOG.trace("Visiting dataset {}", namespacedEntityId2);
                    Iterable<Relation> programRelations = getProgramRelations(namespacedEntityId2, scanRange.getStart(), scanRange.getEnd(), scanRange.getFilter());
                    LOG.trace("Got program relations {}", programRelations);
                    for (Relation relation : programRelations) {
                        create.put(new RelationKey(relation), relation);
                    }
                    Iterables.addAll(hashSet4, Iterables.transform(programRelations, RELATION_TO_PROGRAM_FUNCTION));
                }
            }
            hashSet2.clear();
            for (ProgramId programId : hashSet4) {
                if (hashSet3.add(programId)) {
                    LOG.trace("Visiting program {}", programId);
                    Set<Relation> relations = this.lineageStoreReader.getRelations(programId, scanRange.getStart(), scanRange.getEnd(), scanRange.getFilter());
                    LOG.trace("Got data relations {}", relations);
                    for (Relation relation2 : relations) {
                        create.put(new RelationKey(relation2), relation2);
                    }
                    Iterables.addAll(hashSet2, Iterables.transform(relations, RELATION_TO_DATA_FUNCTION));
                }
            }
        }
        if (str != null && str.contains("workflow")) {
            create = doComputeRollupLineage(create);
        }
        Lineage lineage = new Lineage(Iterables.concat(Maps.transformValues(create.asMap(), COLLAPSE_UNKNOWN_TYPE_FUNCTION).values()));
        LOG.trace("Got lineage {}", lineage);
        return lineage;
    }

    private Iterable<Relation> getProgramRelations(NamespacedEntityId namespacedEntityId, long j, long j2, Predicate<Relation> predicate) {
        if (namespacedEntityId instanceof DatasetId) {
            return this.lineageStoreReader.getRelations((DatasetId) namespacedEntityId, j, j2, predicate);
        }
        if (namespacedEntityId instanceof StreamId) {
            return this.lineageStoreReader.getRelations((StreamId) namespacedEntityId, j, j2, predicate);
        }
        throw new IllegalStateException("Unknown data type " + namespacedEntityId);
    }

    @VisibleForTesting
    static ScanRangeWithFilter getScanRange(final Set<RunId> set) {
        if (set.isEmpty()) {
            return new ScanRangeWithFilter(0L, 0L, Predicates.alwaysFalse());
        }
        long j = Long.MAX_VALUE;
        long j2 = 0;
        Iterator<RunId> it = set.iterator();
        while (it.hasNext()) {
            long time = RunIds.getTime(it.next(), TimeUnit.MILLISECONDS);
            if (time < j) {
                j = time;
            }
            if (time > j2) {
                j2 = time;
            }
        }
        return new ScanRangeWithFilter(j, j2 + 1, new Predicate<Relation>() { // from class: co.cask.cdap.metadata.LineageAdmin.6
            public boolean apply(Relation relation) {
                return set.contains(relation.getRun());
            }
        });
    }
}
