package co.cask.cdap.notifications.service.kafka;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionSystemClientService;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import co.cask.cdap.notifications.feeds.NotificationFeedNotFoundException;
import co.cask.cdap.notifications.service.AbstractNotificationService;
import co.cask.cdap.notifications.service.NotificationException;
import co.cask.cdap.notifications.service.NotificationHandler;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NotificationFeedId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/notifications/service/kafka/MessagingNotificationService.class */
public class MessagingNotificationService extends AbstractNotificationService {
    private static final Logger LOG = LoggerFactory.getLogger(MessagingNotificationService.class);
    private final MessagingService messagingService;
    private final TopicId notificationTopic;
    private final AtomicBoolean needFetch;
    private ListeningExecutorService publishingExecutor;
    private ScheduledExecutorService subscribeExecutor;

    @Inject
    MessagingNotificationService(CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClientService transactionSystemClientService, NotificationFeedManager notificationFeedManager, MessagingService messagingService) {
        super(datasetFramework, transactionSystemClientService, notificationFeedManager);
        this.messagingService = messagingService;
        this.needFetch = new AtomicBoolean(false);
        this.notificationTopic = NamespaceId.SYSTEM.topic(cConfiguration.get("notification.topic"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.notifications.service.AbstractNotificationService
    public void startUp() throws Exception {
        super.startUp();
        this.publishingExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("notification-publisher")));
        this.subscribeExecutor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("notification-subscriber"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.notifications.service.AbstractNotificationService
    public void shutDown() throws Exception {
        this.publishingExecutor.shutdownNow();
        this.subscribeExecutor.shutdownNow();
        super.shutDown();
    }

    public <N> ListenableFuture<N> publish(final NotificationFeedId notificationFeedId, final N n, final Type type) throws NotificationException {
        LOG.trace("Publishing on notification feed [{}]: {}", notificationFeedId, n);
        return this.publishingExecutor.submit(new Callable<N>() { // from class: co.cask.cdap.notifications.service.kafka.MessagingNotificationService.1
            @Override // java.util.concurrent.Callable
            public N call() throws Exception {
                try {
                    MessagingNotificationService.this.messagingService.publish(StoreRequestBuilder.of(MessagingNotificationService.this.notificationTopic).addPayloads(new String[]{MessagingNotificationService.GSON.toJson(new NotificationMessage(notificationFeedId, MessagingNotificationService.GSON.toJsonTree(n, type)))}).build());
                    return (N) n;
                } catch (Exception e) {
                    throw new NotificationException(e);
                }
            }
        });
    }

    @Override // co.cask.cdap.notifications.service.AbstractNotificationService
    public <N> Cancellable subscribe(NotificationFeedId notificationFeedId, NotificationHandler<N> notificationHandler, Executor executor) throws NotificationFeedNotFoundException, NotificationFeedException {
        Cancellable subscribe = super.subscribe(notificationFeedId, notificationHandler, executor);
        if (!this.needFetch.compareAndSet(false, true)) {
            return subscribe;
        }
        this.subscribeExecutor.execute(new Runnable() { // from class: co.cask.cdap.notifications.service.kafka.MessagingNotificationService.2
            private final long startTime = System.currentTimeMillis();
            private final RetryStrategy scheduleStrategy = RetryStrategies.exponentialDelay(100, 3000, TimeUnit.MILLISECONDS);
            private byte[] messageId;
            private int emptyFetchCount;

            @Override // java.lang.Runnable
            public void run() {
                try {
                    MessageFetcher prepareFetch = MessagingNotificationService.this.messagingService.prepareFetch(MessagingNotificationService.this.notificationTopic);
                    if (this.messageId == null) {
                        prepareFetch.setStartTime(this.startTime);
                    } else {
                        prepareFetch.setStartMessage(this.messageId, false);
                    }
                    this.emptyFetchCount++;
                    CloseableIterator fetch = prepareFetch.fetch();
                    Throwable th = null;
                    while (fetch.hasNext()) {
                        try {
                            try {
                                this.emptyFetchCount = 0;
                                RawMessage rawMessage = (RawMessage) fetch.next();
                                NotificationMessage notificationMessage = (NotificationMessage) MessagingNotificationService.GSON.fromJson(new String(rawMessage.getPayload(), StandardCharsets.UTF_8), NotificationMessage.class);
                                try {
                                    MessagingNotificationService.LOG.trace("Decoded notification: {}", notificationMessage);
                                    MessagingNotificationService.this.notificationReceived(notificationMessage.getFeedId(), notificationMessage.getNotificationJson());
                                } catch (Throwable th2) {
                                    MessagingNotificationService.LOG.warn("Error while processing notification {} with handler {}", notificationMessage, th2);
                                }
                                this.messageId = rawMessage.getId();
                            } finally {
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    }
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                } catch (Exception e) {
                    MessagingNotificationService.LOG.error("Failed to get notification", e);
                }
                if (this.emptyFetchCount > 0) {
                    MessagingNotificationService.this.subscribeExecutor.schedule(this, this.scheduleStrategy.nextRetry(this.emptyFetchCount, this.startTime), TimeUnit.MILLISECONDS);
                } else {
                    MessagingNotificationService.this.subscribeExecutor.execute(this);
                }
            }
        });
        return subscribe;
    }
}
