package co.cask.cdap.data2.metadata.writer;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.LineageDataset;
import co.cask.cdap.data2.metadata.lineage.field.FieldLineageDataset;
import co.cask.cdap.data2.metadata.lineage.field.FieldLineageInfo;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import javax.annotation.Nullable;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/writer/BasicLineageWriter.class */
public class BasicLineageWriter implements LineageWriter, FieldLineageWriter {
    private static final Logger LOG = LoggerFactory.getLogger(BasicLineageWriter.class);
    private final DatasetFramework datasetFramework;
    private final Transactional transactional;

    @VisibleForTesting
    @Inject
    public BasicLineageWriter(DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
        this.datasetFramework = datasetFramework;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, ImmutableMap.of(), null, null, new MultiThreadTransactionAware[0])), RetryStrategies.retryOnConflict(20, 100L));
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        if (getLineageDatasetId().equals(datasetId)) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        LOG.trace("Writing access for run {}, dataset {}, accessType {}, component {}, accessTime = {}", new Object[]{programRunId, datasetId, accessType, namespacedEntityId, Long.valueOf(currentTimeMillis)});
        Transactionals.execute(this.transactional, datasetContext -> {
            LineageDataset.getLineageDataset(datasetContext, this.datasetFramework, getLineageDatasetId()).addAccess(programRunId, datasetId, accessType, currentTimeMillis, namespacedEntityId);
        });
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        long currentTimeMillis = System.currentTimeMillis();
        LOG.trace("Writing access for run {}, stream {}, accessType {}, component {}, accessTime = {}", new Object[]{programRunId, streamId, accessType, namespacedEntityId, Long.valueOf(currentTimeMillis)});
        Transactionals.execute(this.transactional, datasetContext -> {
            LineageDataset.getLineageDataset(datasetContext, this.datasetFramework, getLineageDatasetId()).addAccess(programRunId, streamId, accessType, currentTimeMillis, namespacedEntityId);
        });
    }

    @VisibleForTesting
    protected DatasetId getLineageDatasetId() {
        return LineageDataset.LINEAGE_DATASET_ID;
    }

    @VisibleForTesting
    protected DatasetId getFieldLineageDatasetId() {
        return FieldLineageDataset.FIELD_LINEAGE_DATASET_ID;
    }

    @Override // co.cask.cdap.data2.metadata.writer.FieldLineageWriter
    public void write(ProgramRunId programRunId, FieldLineageInfo fieldLineageInfo) {
        Transactionals.execute(this.transactional, datasetContext -> {
            FieldLineageDataset.getFieldLineageDataset(datasetContext, this.datasetFramework, getFieldLineageDatasetId()).addFieldLineageInfo(programRunId, fieldLineageInfo);
        });
    }
}
