package co.cask.cdap.data2.metadata.writer;

import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.LineageStoreWriter;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import com.google.inject.Inject;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/writer/BasicLineageWriter.class */
public class BasicLineageWriter implements LineageWriter {
    private static final Logger LOG = LoggerFactory.getLogger(BasicLineageWriter.class);
    private final LineageStoreWriter lineageStoreWriter;
    private final ConcurrentMap<DataAccessKey, Boolean> registered = new ConcurrentHashMap();

    /* loaded from: input_file:co/cask/cdap/data2/metadata/writer/BasicLineageWriter$DataAccessKey.class */
    public static final class DataAccessKey {
        private final ProgramRunId run;
        private final NamespacedEntityId data;
        private final AccessType accessType;
        private final NamespacedEntityId component;

        public DataAccessKey(ProgramRunId programRunId, NamespacedEntityId namespacedEntityId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId2) {
            this.run = programRunId;
            this.data = namespacedEntityId;
            this.accessType = accessType;
            this.component = namespacedEntityId2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof DataAccessKey)) {
                return false;
            }
            DataAccessKey dataAccessKey = (DataAccessKey) obj;
            return Objects.equals(this.run, dataAccessKey.run) && Objects.equals(this.data, dataAccessKey.data) && Objects.equals(this.accessType, dataAccessKey.accessType) && Objects.equals(this.component, dataAccessKey.component);
        }

        public int hashCode() {
            return Objects.hash(this.run, this.data, this.accessType, this.component);
        }
    }

    @Inject
    BasicLineageWriter(LineageStoreWriter lineageStoreWriter) {
        this.lineageStoreWriter = lineageStoreWriter;
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType) {
        addAccess(programRunId, datasetId, accessType, (NamespacedEntityId) null);
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        if (alreadyRegistered(programRunId, datasetId, accessType, namespacedEntityId)) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        LOG.debug("Writing access for run {}, dataset {}, accessType {}, component {}, accessTime = {}", new Object[]{programRunId, datasetId, accessType, namespacedEntityId, Long.valueOf(currentTimeMillis)});
        this.lineageStoreWriter.addAccess(programRunId, datasetId, accessType, currentTimeMillis, namespacedEntityId);
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType) {
        addAccess(programRunId, streamId, accessType, (NamespacedEntityId) null);
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        if (alreadyRegistered(programRunId, streamId, accessType, namespacedEntityId)) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        LOG.debug("Writing access for run {}, stream {}, accessType {}, component {}, accessTime = {}", new Object[]{programRunId, streamId, accessType, namespacedEntityId, Long.valueOf(currentTimeMillis)});
        this.lineageStoreWriter.addAccess(programRunId, streamId, accessType, currentTimeMillis, namespacedEntityId);
    }

    private boolean alreadyRegistered(ProgramRunId programRunId, NamespacedEntityId namespacedEntityId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId2) {
        return this.registered.putIfAbsent(new DataAccessKey(programRunId, namespacedEntityId, accessType, namespacedEntityId2), true) != null;
    }
}
