package co.cask.cdap.data2.registry;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.metadata.writer.MetadataMessage;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.collect.Iterables;
import com.google.gson.Gson;
import com.google.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/* loaded from: input_file:co/cask/cdap/data2/registry/MessagingUsageWriter.class */
public class MessagingUsageWriter implements UsageWriter {
    private static final Gson GSON = new Gson();
    private final TopicId topic;
    private final MessagingService messagingService;
    private final RetryStrategy retryStrategy;

    @Inject
    MessagingUsageWriter(CConfiguration cConfiguration, MessagingService messagingService) {
        this.topic = NamespaceId.SYSTEM.topic(cConfiguration.get("metadata.messaging.topic"));
        this.messagingService = messagingService;
        this.retryStrategy = RetryStrategies.fromConfiguration(cConfiguration, "system.metadata.");
    }

    @Override // co.cask.cdap.data2.registry.UsageWriter
    public void registerAll(Iterable<? extends EntityId> iterable, StreamId streamId) {
        try {
            doRegisterAll(iterable, streamId);
        } catch (Exception e) {
            throw new RuntimeException("Failed to publish usage for " + streamId + " with owners " + Iterables.toString(iterable), e);
        }
    }

    @Override // co.cask.cdap.data2.registry.UsageWriter
    public void registerAll(Iterable<? extends EntityId> iterable, DatasetId datasetId) {
        try {
            doRegisterAll(iterable, datasetId);
        } catch (Exception e) {
            throw new RuntimeException("Failed to publish usage for " + datasetId + " with owners " + Iterables.toString(iterable), e);
        }
    }

    @Override // co.cask.cdap.data2.registry.UsageWriter
    public void register(EntityId entityId, StreamId streamId) {
        if (entityId instanceof ProgramId) {
            register((ProgramId) entityId, streamId);
        }
    }

    @Override // co.cask.cdap.data2.registry.UsageWriter
    public void register(EntityId entityId, DatasetId datasetId) {
        if (entityId instanceof ProgramId) {
            register((ProgramId) entityId, datasetId);
        }
    }

    @Override // co.cask.cdap.data2.registry.UsageWriter
    public void register(ProgramId programId, DatasetId datasetId) {
        StoreRequest build = StoreRequestBuilder.of(this.topic).addPayload(GSON.toJson(new MetadataMessage(MetadataMessage.Type.USAGE, programId, GSON.toJsonTree(new DatasetUsage(datasetId))))).build();
        try {
            Retries.callWithRetries(() -> {
                return this.messagingService.publish(build);
            }, this.retryStrategy, Retries.ALWAYS_TRUE);
        } catch (Exception e) {
            throw new RuntimeException("Failed to publish usage for " + datasetId + " for program " + programId, e);
        }
    }

    @Override // co.cask.cdap.data2.registry.UsageWriter
    public void register(ProgramId programId, StreamId streamId) {
        StoreRequest build = StoreRequestBuilder.of(this.topic).addPayload(GSON.toJson(new MetadataMessage(MetadataMessage.Type.USAGE, programId, GSON.toJsonTree(new DatasetUsage(streamId))))).build();
        try {
            Retries.callWithRetries(() -> {
                return this.messagingService.publish(build);
            }, this.retryStrategy, Retries.ALWAYS_TRUE);
        } catch (Exception e) {
            throw new RuntimeException("Failed to publish usage for " + streamId + " for program " + programId, e);
        }
    }

    private void doRegisterAll(Iterable<? extends EntityId> iterable, EntityId entityId) throws Exception {
        StoreRequestBuilder of = StoreRequestBuilder.of(this.topic);
        Stream stream = StreamSupport.stream(iterable.spliterator(), false);
        Class<ProgramId> cls = ProgramId.class;
        ProgramId.class.getClass();
        Stream filter = stream.filter((v1) -> {
            return r2.isInstance(v1);
        });
        Class<ProgramId> cls2 = ProgramId.class;
        ProgramId.class.getClass();
        Stream map = filter.map((v1) -> {
            return r2.cast(v1);
        }).map(programId -> {
            return new MetadataMessage(MetadataMessage.Type.USAGE, programId, GSON.toJsonTree(new DatasetUsage(entityId)));
        });
        Gson gson = GSON;
        gson.getClass();
        StoreRequest build = of.addPayloads(map.map((v1) -> {
            return r2.toJson(v1);
        }).map(str -> {
            return str.getBytes(StandardCharsets.UTF_8);
        }).iterator()).build();
        Retries.callWithRetries(() -> {
            return this.messagingService.publish(build);
        }, this.retryStrategy, Retries.ALWAYS_TRUE);
    }
}
