package co.cask.cdap.data2.metadata.lineage;

import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionExecutorFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/LineageStore.class */
public class LineageStore implements LineageStoreReader, LineageStoreWriter {
    private static final DatasetId LINEAGE_DATASET_ID = NamespaceId.SYSTEM.dataset("lineage");
    private final TransactionExecutorFactory executorFactory;
    private final DatasetFramework datasetFramework;
    private final DatasetId lineageDatasetId;

    @Inject
    public LineageStore(TransactionExecutorFactory transactionExecutorFactory, DatasetFramework datasetFramework) {
        this(transactionExecutorFactory, datasetFramework, LINEAGE_DATASET_ID);
    }

    @VisibleForTesting
    public LineageStore(TransactionExecutorFactory transactionExecutorFactory, DatasetFramework datasetFramework, DatasetId datasetId) {
        this.executorFactory = transactionExecutorFactory;
        this.datasetFramework = datasetFramework;
        this.lineageDatasetId = datasetId;
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreWriter
    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, long j) {
        addAccess(programRunId, datasetId, accessType, j, (NamespacedEntityId) null);
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreWriter
    public void addAccess(final ProgramRunId programRunId, final DatasetId datasetId, final AccessType accessType, final long j, @Nullable final NamespacedEntityId namespacedEntityId) {
        execute(new TransactionExecutor.Procedure<LineageDataset>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.1
            public void apply(LineageDataset lineageDataset) throws Exception {
                lineageDataset.addAccess(programRunId, datasetId, accessType, j, namespacedEntityId);
            }
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreWriter
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType, long j) {
        addAccess(programRunId, streamId, accessType, j, (NamespacedEntityId) null);
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreWriter
    public void addAccess(final ProgramRunId programRunId, final StreamId streamId, final AccessType accessType, final long j, @Nullable final NamespacedEntityId namespacedEntityId) {
        execute(new TransactionExecutor.Procedure<LineageDataset>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.2
            public void apply(LineageDataset lineageDataset) throws Exception {
                lineageDataset.addAccess(programRunId, streamId, accessType, j, namespacedEntityId);
            }
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<NamespacedEntityId> getEntitiesForRun(final ProgramRunId programRunId) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<NamespacedEntityId>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.3
            public Set<NamespacedEntityId> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getEntitiesForRun(programRunId);
            }
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<Relation> getRelations(final DatasetId datasetId, final long j, final long j2, final Predicate<Relation> predicate) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Relation>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.4
            public Set<Relation> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getRelations(datasetId, j, j2, predicate);
            }
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<Relation> getRelations(final StreamId streamId, final long j, final long j2, final Predicate<Relation> predicate) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Relation>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.5
            public Set<Relation> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getRelations(streamId, j, j2, predicate);
            }
        });
    }

    @Override // co.cask.cdap.data2.metadata.lineage.LineageStoreReader
    public Set<Relation> getRelations(final ProgramId programId, final long j, final long j2, final Predicate<Relation> predicate) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Relation>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.6
            public Set<Relation> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getRelations(programId, j, j2, predicate);
            }
        });
    }

    @VisibleForTesting
    public List<Long> getAccessTimesForRun(final ProgramRunId programRunId) {
        return (List) execute(new TransactionExecutor.Function<LineageDataset, List<Long>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.7
            public List<Long> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getAccessTimesForRun(programRunId);
            }
        });
    }

    private <T> T execute(TransactionExecutor.Function<LineageDataset, T> function) {
        LineageDataset newLineageDataset = newLineageDataset();
        return (T) Transactions.createTransactionExecutor(this.executorFactory, (TransactionAware) newLineageDataset).executeUnchecked(function, newLineageDataset);
    }

    private void execute(TransactionExecutor.Procedure<LineageDataset> procedure) {
        LineageDataset newLineageDataset = newLineageDataset();
        Transactions.createTransactionExecutor(this.executorFactory, (TransactionAware) newLineageDataset).executeUnchecked(procedure, newLineageDataset);
    }

    private LineageDataset newLineageDataset() {
        try {
            return DatasetsUtil.getOrCreateDataset(this.datasetFramework, this.lineageDatasetId, LineageDataset.class.getName(), DatasetProperties.EMPTY, (Map<String, String>) DatasetDefinition.NO_ARGUMENTS);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public static void setupDatasets(DatasetFramework datasetFramework) throws IOException, DatasetManagementException {
        datasetFramework.addInstance(LineageDataset.class.getName(), LINEAGE_DATASET_ID, DatasetProperties.EMPTY);
    }
}
