package co.cask.cdap.data2.metadata.lineage;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionSystemClient;

/* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/DefaultLineageStoreReader.class */
public class DefaultLineageStoreReader implements LineageStoreReader {
    private final DatasetFramework datasetFramework;
    private final Transactional transactional;
    private final DatasetId lineageDatasetId;

    @Inject
    DefaultLineageStoreReader(DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
        this(datasetFramework, transactionSystemClient, LineageDataset.LINEAGE_DATASET_ID);
    }

    @VisibleForTesting
    public DefaultLineageStoreReader(DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, DatasetId datasetId) {
        this.datasetFramework = datasetFramework;
        this.lineageDatasetId = datasetId;
        this.transactional = Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, ImmutableMap.of(), null, null, new MultiThreadTransactionAware[0]));
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<NamespacedEntityId> getEntitiesForRun(ProgramRunId programRunId) {
        return (Set) execute(lineageDataset -> {
            return lineageDataset.getEntitiesForRun(programRunId);
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<Relation> getRelations(DatasetId datasetId, long j, long j2, Predicate<Relation> predicate) {
        return (Set) execute(lineageDataset -> {
            return lineageDataset.getRelations(datasetId, j, j2, (Predicate<Relation>) predicate);
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<Relation> getRelations(StreamId streamId, long j, long j2, Predicate<Relation> predicate) {
        return (Set) execute(lineageDataset -> {
            return lineageDataset.getRelations(streamId, j, j2, (Predicate<Relation>) predicate);
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<Relation> getRelations(ProgramId programId, long j, long j2, Predicate<Relation> predicate) {
        return (Set) execute(lineageDataset -> {
            return lineageDataset.getRelations(programId, j, j2, (Predicate<Relation>) predicate);
        });
    }

    private <T> T execute(TransactionExecutor.Function<LineageDataset, T> function) {
        return (T) Transactionals.execute(this.transactional, datasetContext -> {
            return function.apply(LineageDataset.getLineageDataset(datasetContext, this.datasetFramework, this.lineageDatasetId));
        });
    }
}
