package co.cask.cdap.data2.metadata.writer;

import co.cask.cdap.api.lineage.field.Operation;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.field.FieldLineageInfo;
import co.cask.cdap.data2.metadata.writer.MetadataMessage;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.codec.OperationTypeAdapter;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.id.TopicId;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/writer/MessagingLineageWriter.class */
public class MessagingLineageWriter implements LineageWriter, FieldLineageWriter {
    private static final Logger LOG = LoggerFactory.getLogger(MessagingLineageWriter.class);
    private static final Gson GSON = new GsonBuilder().enableComplexMapKeySerialization().registerTypeAdapter(Operation.class, new OperationTypeAdapter()).create();
    private final TopicId topic;
    private final MessagingService messagingService;
    private final RetryStrategy retryStrategy;

    @Inject
    MessagingLineageWriter(CConfiguration cConfiguration, MessagingService messagingService) {
        this.topic = NamespaceId.SYSTEM.topic(cConfiguration.get("metadata.messaging.topic"));
        this.messagingService = messagingService;
        this.retryStrategy = RetryStrategies.fromConfiguration(cConfiguration, "system.metadata.");
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        publishLineage(programRunId, new DataAccessLineage(accessType, datasetId, namespacedEntityId));
    }

    @Override // co.cask.cdap.data2.metadata.writer.LineageWriter
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType, @Nullable NamespacedEntityId namespacedEntityId) {
        publishLineage(programRunId, new DataAccessLineage(accessType, streamId, namespacedEntityId));
    }

    @Override // co.cask.cdap.data2.metadata.writer.FieldLineageWriter
    public void write(ProgramRunId programRunId, FieldLineageInfo fieldLineageInfo) {
        publish(new MetadataMessage(MetadataMessage.Type.FIELD_LINEAGE, programRunId, GSON.toJsonTree(fieldLineageInfo)));
    }

    private void publishLineage(ProgramRunId programRunId, DataAccessLineage dataAccessLineage) {
        publish(new MetadataMessage(MetadataMessage.Type.LINEAGE, programRunId, GSON.toJsonTree(dataAccessLineage)));
    }

    private void publish(MetadataMessage metadataMessage) {
        StoreRequest build = StoreRequestBuilder.of(this.topic).addPayload(GSON.toJson(metadataMessage)).build();
        try {
            Retries.callWithRetries(() -> {
                return this.messagingService.publish(build);
            }, this.retryStrategy, Retries.ALWAYS_TRUE);
        } catch (Exception e) {
            LOG.trace("Failed to publish metadata message: {}", metadataMessage);
            throw new RuntimeException(String.format("Failed to publish metadata message of type '%s' for program run '%s'.", metadataMessage.getType(), metadataMessage.getEntityId()), e);
        }
    }
}
