package co.cask.cdap.notifications.service;

import co.cask.cdap.common.service.UnloggedExceptionIdleService;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionSystemClientService;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import co.cask.cdap.notifications.feeds.NotificationFeedNotFoundException;
import co.cask.cdap.notifications.service.inmemory.InMemoryNotificationService;
import co.cask.cdap.proto.Id;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.concurrent.Executor;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/notifications/service/AbstractNotificationService.class */
public abstract class AbstractNotificationService extends UnloggedExceptionIdleService implements NotificationService {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryNotificationService.class);
    private final Multimap<Id.NotificationFeed, NotificationCaller<?>> subscribers = Multimaps.synchronizedMultimap(HashMultimap.create());
    private final DatasetFramework dsFramework;
    private final TransactionSystemClientService transactionSystemClient;
    private final NotificationFeedManager feedManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/notifications/service/AbstractNotificationService$NotificationCaller.class */
    public class NotificationCaller<N> implements NotificationHandler<N>, Cancellable {
        private final Id.NotificationFeed feed;
        private final NotificationHandler<N> handler;
        private final Executor executor;
        private volatile boolean completed;

        NotificationCaller(Id.NotificationFeed notificationFeed, NotificationHandler<N> notificationHandler, Executor executor) {
            this.feed = notificationFeed;
            this.handler = notificationHandler;
            this.executor = executor;
        }

        public Type getNotificationType() {
            return this.handler.getNotificationType();
        }

        public void received(final N n, final NotificationContext notificationContext) {
            if (this.completed) {
                return;
            }
            this.executor.execute(new Runnable() { // from class: co.cask.cdap.notifications.service.AbstractNotificationService.NotificationCaller.1
                @Override // java.lang.Runnable
                public void run() {
                    if (NotificationCaller.this.completed) {
                        return;
                    }
                    try {
                        NotificationCaller.this.handler.received(n, notificationContext);
                    } catch (Throwable th) {
                        AbstractNotificationService.LOG.warn("Notification {} on feed {} could not be processed successfully by handler {}", new Object[]{n, NotificationCaller.this.feed, NotificationCaller.this.handler, th});
                    }
                }
            });
        }

        public void cancel() {
            this.completed = true;
            AbstractNotificationService.this.subscribers.remove(this.feed, this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractNotificationService(DatasetFramework datasetFramework, TransactionSystemClientService transactionSystemClientService, NotificationFeedManager notificationFeedManager) {
        this.dsFramework = datasetFramework;
        this.transactionSystemClient = transactionSystemClientService;
        this.feedManager = notificationFeedManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startUp() throws Exception {
        this.transactionSystemClient.startAndWait();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDown() throws Exception {
        this.transactionSystemClient.stopAndWait();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Gson createGson() {
        return new GsonBuilder().enableComplexMapKeySerialization().create();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notificationReceived(Id.NotificationFeed notificationFeed, JsonElement jsonElement) {
        ImmutableList<NotificationCaller> copyOf;
        LOG.trace("Notification received on feed {}: {}", notificationFeed, jsonElement);
        Collection collection = this.subscribers.get(notificationFeed);
        synchronized (this.subscribers) {
            copyOf = ImmutableList.copyOf(collection);
        }
        for (NotificationCaller notificationCaller : copyOf) {
            notificationCaller.received(createGson().fromJson(jsonElement, notificationCaller.getNotificationType()), new BasicNotificationContext(Id.Namespace.from(notificationFeed.getNamespaceId()), this.dsFramework, this.transactionSystemClient));
        }
    }

    public <N> ListenableFuture<N> publish(Id.NotificationFeed notificationFeed, N n) throws NotificationException {
        return publish(notificationFeed, n, n.getClass());
    }

    public <N> Cancellable subscribe(Id.NotificationFeed notificationFeed, NotificationHandler<N> notificationHandler) throws NotificationFeedNotFoundException, NotificationFeedException {
        return subscribe(notificationFeed, notificationHandler, Threads.SAME_THREAD_EXECUTOR);
    }

    public <N> Cancellable subscribe(Id.NotificationFeed notificationFeed, NotificationHandler<N> notificationHandler, Executor executor) throws NotificationFeedNotFoundException, NotificationFeedException {
        this.feedManager.getFeed(notificationFeed);
        NotificationCaller notificationCaller = new NotificationCaller(notificationFeed, notificationHandler, executor);
        this.subscribers.put(notificationFeed, notificationCaller);
        return notificationCaller;
    }
}
