package co.cask.cdap.data2.metadata.lineage;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.AbstractDataset;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.lib.table.MDSKey;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.FlowletId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/LineageDataset.class */
public class LineageDataset extends AbstractDataset {
    public static final String TYPE = "lineageDataset";
    private static final char DATASET_MARKER = 'd';
    private static final char PROGRAM_MARKER = 'p';
    private static final char FLOWLET_MARKER = 'f';
    private static final char STREAM_MARKER = 's';
    private static final char NONE_MARKER = '0';
    private Table accessRegistryTable;
    public static final DatasetId LINEAGE_DATASET_ID = NamespaceId.SYSTEM.dataset("lineage");
    private static final Logger LOG = LoggerFactory.getLogger(LineageDataset.class);
    private static final char TOPIC_MARKER = 't';
    private static final byte[] ACCESS_TIME_COLS_BYTE = {TOPIC_MARKER};

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/LineageDataset$RowKey.class */
    public static final class RowKey {
        private final ProgramId program;
        private final NamespacedEntityId data;
        private final RunId runId;

        RowKey(ProgramId programId, NamespacedEntityId namespacedEntityId, RunId runId) {
            this.program = programId;
            this.data = namespacedEntityId;
            this.runId = runId;
        }

        public ProgramId getProgram() {
            return this.program;
        }

        public NamespacedEntityId getData() {
            return this.data;
        }

        public RunId getRunId() {
            return this.runId;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof RowKey)) {
                return false;
            }
            RowKey rowKey = (RowKey) obj;
            return Objects.equals(this.program, rowKey.program) && Objects.equals(this.data, rowKey.data) && Objects.equals(this.runId, rowKey.runId);
        }

        public int hashCode() {
            return Objects.hash(this.program, this.data, this.runId);
        }

        public String toString() {
            return "RowKey{program=" + this.program + ", data=" + this.data + ", runId=" + this.runId + '}';
        }
    }

    public static void setupDatasets(DatasetFramework datasetFramework) throws IOException, DatasetManagementException {
        datasetFramework.addInstance(LineageDataset.class.getName(), LINEAGE_DATASET_ID, DatasetProperties.EMPTY);
    }

    public static LineageDataset getLineageDataset(DatasetContext datasetContext, DatasetFramework datasetFramework) {
        return getLineageDataset(datasetContext, datasetFramework, LINEAGE_DATASET_ID);
    }

    @VisibleForTesting
    public static LineageDataset getLineageDataset(DatasetContext datasetContext, DatasetFramework datasetFramework, DatasetId datasetId) {
        try {
            return (LineageDataset) DatasetsUtil.getOrCreateDataset(datasetContext, datasetFramework, datasetId, LineageDataset.class.getName(), DatasetProperties.EMPTY);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public LineageDataset(String str, Table table) {
        super(str, table, new Dataset[0]);
        this.accessRegistryTable = table;
    }

    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, long j) {
        addAccess(programRunId, datasetId, accessType, j, (NamespacedEntityId) null);
    }

    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, long j, @Nullable NamespacedEntityId namespacedEntityId) {
        LOG.trace("Recording access run={}, dataset={}, accessType={}, accessTime={}, component={}", new Object[]{programRunId, datasetId, accessType, Long.valueOf(j), namespacedEntityId});
        this.accessRegistryTable.put(getDatasetKey(datasetId, programRunId, accessType, namespacedEntityId), ACCESS_TIME_COLS_BYTE, Bytes.toBytes(j));
        this.accessRegistryTable.put(getProgramKey(programRunId, datasetId, accessType, namespacedEntityId), ACCESS_TIME_COLS_BYTE, Bytes.toBytes(j));
    }

    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType, long j) {
        addAccess(programRunId, streamId, accessType, j, (NamespacedEntityId) null);
    }

    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType, long j, @Nullable NamespacedEntityId namespacedEntityId) {
        LOG.trace("Recording access run={}, stream={}, accessType={}, accessTime={}, component={}", new Object[]{programRunId, streamId, accessType, Long.valueOf(j), namespacedEntityId});
        this.accessRegistryTable.put(getStreamKey(streamId, programRunId, accessType, namespacedEntityId), ACCESS_TIME_COLS_BYTE, Bytes.toBytes(j));
        this.accessRegistryTable.put(getProgramKey(programRunId, streamId, accessType, namespacedEntityId), ACCESS_TIME_COLS_BYTE, Bytes.toBytes(j));
    }

    public Set<NamespacedEntityId> getEntitiesForRun(ProgramRunId programRunId) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        byte[] runScanStartKey = getRunScanStartKey(programRunId);
        Scanner scan = this.accessRegistryTable.scan(runScanStartKey, Bytes.stopKeyForPrefix(runScanStartKey));
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null) {
                        break;
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Got row key = {}", Bytes.toString(next.getRow()));
                    }
                    RowKey parseRow = parseRow(next);
                    if (programRunId.getEntityName().equals(parseRow.getRunId().getId())) {
                        builder.add(parseRow.getProgram());
                        builder.add(parseRow.getData());
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return builder.build();
    }

    public Set<Relation> getRelations(DatasetId datasetId, long j, long j2, Predicate<Relation> predicate) {
        return scanRelations(getDatasetScanStartKey(datasetId, j2), getDatasetScanEndKey(datasetId, j), predicate);
    }

    public Set<Relation> getRelations(StreamId streamId, long j, long j2, Predicate<Relation> predicate) {
        return scanRelations(getStreamScanStartKey(streamId, j2), getStreamScanEndKey(streamId, j), predicate);
    }

    public Set<Relation> getRelations(ProgramId programId, long j, long j2, Predicate<Relation> predicate) {
        return scanRelations(getProgramScanStartKey(programId, j2), getProgramScanEndKey(programId, j), predicate);
    }

    @VisibleForTesting
    public List<Long> getAccessTimesForRun(ProgramRunId programRunId) {
        ImmutableList.Builder builder = ImmutableList.builder();
        byte[] runScanStartKey = getRunScanStartKey(programRunId);
        Scanner scan = this.accessRegistryTable.scan(runScanStartKey, Bytes.stopKeyForPrefix(runScanStartKey));
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null) {
                        break;
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Got row key = {}", Bytes.toString(next.getRow()));
                    }
                    if (programRunId.getEntityName().equals(parseRow(next).getRunId().getId())) {
                        builder.add(Long.valueOf(Bytes.toLong(next.get(ACCESS_TIME_COLS_BYTE))));
                    }
                } catch (Throwable th2) {
                    if (scan != null) {
                        if (th != null) {
                            try {
                                scan.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            scan.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return builder.build();
    }

    private Set<Relation> scanRelations(byte[] bArr, byte[] bArr2, Predicate<Relation> predicate) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        Scanner scan = this.accessRegistryTable.scan(bArr, bArr2);
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null) {
                        break;
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Got row key = {}", Bytes.toString(next.getRow()));
                    }
                    Relation relation = toRelation(next);
                    if (predicate.test(relation)) {
                        builder.add(relation);
                    }
                } catch (Throwable th2) {
                    if (scan != null) {
                        if (th != null) {
                            try {
                                scan.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            scan.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return builder.build();
    }

    private byte[] getDatasetKey(DatasetId datasetId, ProgramRunId programRunId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        MDSKey.Builder builder = new MDSKey.Builder();
        addDataset(builder, datasetId);
        addDataKey(builder, programRunId, accessType, namespacedEntityId);
        return builder.build().getKey();
    }

    private byte[] getStreamKey(StreamId streamId, ProgramRunId programRunId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        MDSKey.Builder builder = new MDSKey.Builder();
        addStream(builder, streamId);
        addDataKey(builder, programRunId, accessType, namespacedEntityId);
        return builder.build().getKey();
    }

    private byte[] getTopicKey(TopicId topicId) {
        return new MDSKey.Builder().add(TOPIC_MARKER).add(topicId.getNamespace()).add(topicId.getTopic()).build().getKey();
    }

    private void addDataKey(MDSKey.Builder builder, ProgramRunId programRunId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        builder.add(getInvertedStartTime(programRunId));
        addProgram(builder, programRunId.getParent());
        builder.add(programRunId.getEntityName());
        builder.add((int) accessType.getType());
        addComponent(builder, namespacedEntityId);
    }

    private byte[] getProgramKey(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        long invertedStartTime = getInvertedStartTime(programRunId);
        MDSKey.Builder builder = new MDSKey.Builder();
        addProgram(builder, programRunId.getParent());
        builder.add(invertedStartTime);
        addDataset(builder, datasetId);
        builder.add(programRunId.getEntityName());
        builder.add((int) accessType.getType());
        addComponent(builder, namespacedEntityId);
        return builder.build().getKey();
    }

    private byte[] getProgramKey(ProgramRunId programRunId, StreamId streamId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        long invertedStartTime = getInvertedStartTime(programRunId);
        MDSKey.Builder builder = new MDSKey.Builder();
        addProgram(builder, programRunId.getParent());
        builder.add(invertedStartTime);
        addStream(builder, streamId);
        builder.add(programRunId.getEntityName());
        builder.add((int) accessType.getType());
        addComponent(builder, namespacedEntityId);
        return builder.build().getKey();
    }

    private RowKey parseRow(Row row) {
        MDSKey.Splitter split = new MDSKey(row.getRow()).split();
        char c = (char) split.getInt();
        LOG.trace("Got marker {}", Character.valueOf(c));
        switch (c) {
            case 'd':
            case STREAM_MARKER /* 115 */:
                NamespacedEntityId entityId = toEntityId(split, c);
                split.skipLong();
                return new RowKey((ProgramId) toEntityId(split, (char) split.getInt()), entityId, RunIds.fromString(split.getString()));
            case PROGRAM_MARKER /* 112 */:
                ProgramId programId = (ProgramId) toEntityId(split, c);
                split.skipLong();
                return new RowKey(programId, toEntityId(split, (char) split.getInt()), RunIds.fromString(split.getString()));
            default:
                throw new IllegalStateException("Invalid row with marker " + c);
        }
    }

    private byte[] getDatasetScanKey(DatasetId datasetId, long j) {
        long invertTime = invertTime(j);
        MDSKey.Builder builder = new MDSKey.Builder();
        addDataset(builder, datasetId);
        builder.add(invertTime);
        return builder.build().getKey();
    }

    private byte[] getDatasetScanStartKey(DatasetId datasetId, long j) {
        return getDatasetScanKey(datasetId, j + 1);
    }

    private byte[] getDatasetScanEndKey(DatasetId datasetId, long j) {
        return getDatasetScanKey(datasetId, j - 1);
    }

    private byte[] getStreamScanKey(StreamId streamId, long j) {
        long invertTime = invertTime(j);
        MDSKey.Builder builder = new MDSKey.Builder();
        addStream(builder, streamId);
        builder.add(invertTime);
        return builder.build().getKey();
    }

    private byte[] getStreamScanStartKey(StreamId streamId, long j) {
        return getStreamScanKey(streamId, j + 1);
    }

    private byte[] getStreamScanEndKey(StreamId streamId, long j) {
        return getStreamScanKey(streamId, j - 1);
    }

    private byte[] getProgramScanKey(ProgramId programId, long j) {
        long invertTime = invertTime(j);
        MDSKey.Builder builder = new MDSKey.Builder();
        addProgram(builder, programId);
        builder.add(invertTime);
        return builder.build().getKey();
    }

    private byte[] getProgramScanStartKey(ProgramId programId, long j) {
        return getProgramScanKey(programId, j == Long.MAX_VALUE ? j : j + 1);
    }

    private byte[] getProgramScanEndKey(ProgramId programId, long j) {
        return getProgramScanKey(programId, j - 1);
    }

    private byte[] getRunScanStartKey(ProgramRunId programRunId) {
        MDSKey.Builder builder = new MDSKey.Builder();
        addProgram(builder, programRunId.getParent());
        builder.add(getInvertedStartTime(programRunId));
        return builder.build().getKey();
    }

    private void addDataset(MDSKey.Builder builder, DatasetId datasetId) {
        builder.add(100).add(datasetId.getNamespace()).add(datasetId.getEntityName());
    }

    private void addStream(MDSKey.Builder builder, StreamId streamId) {
        builder.add(STREAM_MARKER).add(streamId.getNamespace()).add(streamId.getEntityName());
    }

    private void addProgram(MDSKey.Builder builder, ProgramId programId) {
        builder.add(PROGRAM_MARKER).add(programId.getNamespace()).add(programId.getParent().getEntityName()).add(programId.getType().getCategoryName()).add(programId.getEntityName());
    }

    private void addComponent(MDSKey.Builder builder, EntityId entityId) {
        if (entityId instanceof FlowletId) {
            builder.add(FLOWLET_MARKER).add(entityId.getEntityName());
        } else {
            builder.add(NONE_MARKER);
        }
    }

    private NamespacedEntityId toEntityId(MDSKey.Splitter splitter, char c) {
        switch (c) {
            case 'd':
                return new DatasetId(splitter.getString(), splitter.getString());
            case PROGRAM_MARKER /* 112 */:
                return new ProgramId(splitter.getString(), splitter.getString(), ProgramType.valueOfCategoryName(splitter.getString()), splitter.getString());
            case STREAM_MARKER /* 115 */:
                return new StreamId(splitter.getString(), splitter.getString());
            default:
                throw new IllegalStateException("Invalid row with marker " + c);
        }
    }

    @Nullable
    private NamespacedEntityId toComponent(MDSKey.Splitter splitter, ProgramId programId) {
        char c = (char) splitter.getInt();
        switch (c) {
            case NONE_MARKER /* 48 */:
                return null;
            case FLOWLET_MARKER /* 102 */:
                return programId.flowlet(splitter.getString());
            default:
                throw new IllegalStateException("Invalid row with component marker " + c);
        }
    }

    private long invertTime(long j) {
        return Long.MAX_VALUE - j;
    }

    private long getInvertedStartTime(ProgramRunId programRunId) {
        return invertTime(RunIds.getTime(RunIds.fromString(programRunId.getEntityName()), TimeUnit.MILLISECONDS));
    }

    private Relation toRelation(Row row) {
        HashMap hashMap = new HashMap(4);
        MDSKey.Splitter split = new MDSKey(row.getRow()).split();
        char c = (char) split.getInt();
        LOG.trace("Got marker {}", Character.valueOf(c));
        NamespacedEntityId entityId = toEntityId(split, c);
        LOG.trace("Got id1 {}", entityId);
        hashMap.put(Character.valueOf(c), entityId);
        split.skipLong();
        char c2 = (char) split.getInt();
        LOG.trace("Got marker {}", Character.valueOf(c2));
        NamespacedEntityId entityId2 = toEntityId(split, c2);
        LOG.trace("Got id2 {}", entityId);
        hashMap.put(Character.valueOf(c2), entityId2);
        RunId fromString = RunIds.fromString(split.getString());
        LOG.trace("Got runId {}", fromString);
        AccessType fromType = AccessType.fromType((char) split.getInt());
        LOG.trace("Got access type {}", fromType);
        DatasetId datasetId = (DatasetId) hashMap.get('d');
        LOG.trace("Got datasetInstance {}", datasetId);
        StreamId streamId = (StreamId) hashMap.get('s');
        LOG.trace("Got stream {}", streamId);
        ProgramId programId = (ProgramId) hashMap.get('p');
        LOG.trace("Got program {}", programId);
        NamespacedEntityId component = toComponent(split, programId);
        LOG.trace("Got component {}", component);
        if (streamId == null) {
            return new Relation(datasetId, programId, fromType, fromString, (Set<? extends NamespacedEntityId>) (component == null ? ImmutableSet.of() : ImmutableSet.of(component)));
        }
        return new Relation(streamId, programId, fromType, fromString, (Set<? extends NamespacedEntityId>) (component == null ? ImmutableSet.of() : ImmutableSet.of(component)));
    }
}
