package co.cask.cdap.internal.app.store.remote;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.internal.remote.RemoteOpsClient;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.writer.BasicLineageWriter;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.proto.Id;
import com.google.inject.Inject;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.twill.discovery.DiscoveryServiceClient;

/* loaded from: input_file:co/cask/cdap/internal/app/store/remote/RemoteLineageWriter.class */
public class RemoteLineageWriter extends RemoteOpsClient implements LineageWriter {
    private final ConcurrentMap<BasicLineageWriter.DataAccessKey, Boolean> registered;

    @Inject
    RemoteLineageWriter(CConfiguration cConfiguration, DiscoveryServiceClient discoveryServiceClient) {
        super(cConfiguration, discoveryServiceClient, "remote.system.operation");
        this.registered = new ConcurrentHashMap();
    }

    public void addAccess(Id.Run run, Id.DatasetInstance datasetInstance, AccessType accessType) {
        addAccess(run, datasetInstance, accessType, (Id.NamespacedId) null);
    }

    public void addAccess(Id.Run run, Id.DatasetInstance datasetInstance, AccessType accessType, @Nullable Id.NamespacedId namespacedId) {
        if (alreadyRegistered(run, datasetInstance, accessType, namespacedId)) {
            return;
        }
        executeRequest("addDatasetAccess", new Object[]{run, datasetInstance, accessType, namespacedId});
    }

    public void addAccess(Id.Run run, Id.Stream stream, AccessType accessType) {
        addAccess(run, stream, accessType, (Id.NamespacedId) null);
    }

    public void addAccess(Id.Run run, Id.Stream stream, AccessType accessType, @Nullable Id.NamespacedId namespacedId) {
        if (alreadyRegistered(run, stream, accessType, namespacedId)) {
            return;
        }
        executeRequest("addStreamAccess", new Object[]{run, stream, accessType, namespacedId});
    }

    private boolean alreadyRegistered(Id.Run run, Id.NamespacedId namespacedId, AccessType accessType, @Nullable Id.NamespacedId namespacedId2) {
        return this.registered.putIfAbsent(new BasicLineageWriter.DataAccessKey(run, namespacedId, accessType, namespacedId2), true) != null;
    }
}
