package co.cask.cdap.data.stream.service.heartbeat;

import co.cask.cdap.notifications.feeds.NotificationFeed;
import co.cask.cdap.notifications.service.NotificationException;
import co.cask.cdap.notifications.service.NotificationService;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;

/* loaded from: input_file:co/cask/cdap/data/stream/service/heartbeat/NotificationHeartbeatPublisher.class */
public class NotificationHeartbeatPublisher extends AbstractIdleService implements HeartbeatPublisher {
    private final NotificationService notificationService;
    private final NotificationFeed heartbeatFeed = new NotificationFeed.Builder().setNamespace("default").setCategory("streamInternal").setName("heartbeat").build();

    @Inject
    public NotificationHeartbeatPublisher(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    protected void startUp() throws Exception {
        this.notificationService.startAndWait();
    }

    protected void shutDown() throws Exception {
        this.notificationService.stopAndWait();
    }

    @Override // co.cask.cdap.data.stream.service.heartbeat.HeartbeatPublisher
    public ListenableFuture<StreamWriterHeartbeat> sendHeartbeat(StreamWriterHeartbeat streamWriterHeartbeat) {
        try {
            return this.notificationService.publish(this.heartbeatFeed, streamWriterHeartbeat);
        } catch (NotificationException e) {
            throw new IllegalArgumentException("Streams' heartbeat notification feed has not been created", e);
        }
    }
}
