package co.cask.cdap.notifications.service.kafka;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import co.cask.cdap.notifications.feeds.NotificationFeedNotFoundException;
import co.cask.cdap.notifications.service.AbstractNotificationService;
import co.cask.cdap.notifications.service.NotificationException;
import co.cask.cdap.notifications.service.NotificationHandler;
import co.cask.cdap.proto.Id;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaClient;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/notifications/service/kafka/KafkaNotificationService.class */
public class KafkaNotificationService extends AbstractNotificationService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationService.class);
    private static final Gson GSON = new GsonBuilder().enableComplexMapKeySerialization().create();
    private final KafkaClient kafkaClient;
    private final NotificationFeedManager feedManager;
    private final KafkaPublisher.Ack ack;
    private final Map<TopicPartition, KafkaNotificationsCallback> kafkaCallbacks;
    private KafkaPublisher kafkaPublisher;
    private final int nbPartitions;
    private ListeningExecutorService publishingExecutor;

    /* loaded from: input_file:co/cask/cdap/notifications/service/kafka/KafkaNotificationService$KafkaNotificationsCallback.class */
    private final class KafkaNotificationsCallback implements KafkaConsumer.MessageCallback {
        private final TopicPartition topicPartition;
        private int subscriptions;
        private Cancellable kafkaSubscription;

        private KafkaNotificationsCallback(TopicPartition topicPartition) {
            this.topicPartition = topicPartition;
        }

        public <N> Cancellable subscribe(Id.NotificationFeed notificationFeed, NotificationHandler<N> notificationHandler, Executor executor) throws NotificationFeedNotFoundException, NotificationFeedException {
            final Cancellable subscribe = KafkaNotificationService.super.subscribe(notificationFeed, notificationHandler, executor);
            synchronized (KafkaNotificationService.this) {
                if (this.subscriptions == 0) {
                    KafkaConsumer.Preparer prepare = KafkaNotificationService.this.kafkaClient.getConsumer().prepare();
                    for (int i = 0; i < KafkaNotificationService.this.nbPartitions; i++) {
                        prepare.addLatest(this.topicPartition.getTopic(), i);
                    }
                    this.kafkaSubscription = prepare.consume(this);
                }
                this.subscriptions++;
            }
            return new Cancellable() { // from class: co.cask.cdap.notifications.service.kafka.KafkaNotificationService.KafkaNotificationsCallback.1
                public void cancel() {
                    subscribe.cancel();
                    synchronized (KafkaNotificationService.this) {
                        KafkaNotificationsCallback.access$610(KafkaNotificationsCallback.this);
                        if (KafkaNotificationsCallback.this.subscriptions == 0) {
                            KafkaNotificationsCallback.this.kafkaSubscription.cancel();
                            KafkaNotificationService.this.kafkaCallbacks.remove(KafkaNotificationsCallback.this.topicPartition);
                        }
                    }
                }
            };
        }

        public void onReceived(Iterator<FetchedMessage> it) {
            int i = 0;
            while (it.hasNext()) {
                i++;
                FetchedMessage next = it.next();
                try {
                    KafkaMessage decode = KafkaMessageCodec.decode(next.getPayload());
                    try {
                        KafkaNotificationService.LOG.trace("Decoded notification from Kafka: {}", decode);
                        KafkaNotificationService.this.notificationReceived(KafkaNotificationUtils.getMessageFeed(decode.getMessageKey()), decode.getNotificationJson());
                    } catch (Throwable th) {
                        KafkaNotificationService.LOG.warn("Error while processing notification {} with handler {}", decode.getNotificationJson(), th);
                    }
                } catch (IOException e) {
                    KafkaNotificationService.LOG.error("Could not decode Kafka message {} using Gson.", next, e);
                }
            }
            KafkaNotificationService.LOG.trace("Handled {} messages from kafka", Integer.valueOf(i));
        }

        public void finished() {
            KafkaNotificationService.LOG.info("Subscription to topic partition {} finished.", this.topicPartition);
        }

        static /* synthetic */ int access$610(KafkaNotificationsCallback kafkaNotificationsCallback) {
            int i = kafkaNotificationsCallback.subscriptions;
            kafkaNotificationsCallback.subscriptions = i - 1;
            return i;
        }
    }

    @Inject
    public KafkaNotificationService(CConfiguration cConfiguration, KafkaClient kafkaClient, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, NotificationFeedManager notificationFeedManager) {
        super(datasetFramework, transactionSystemClient, notificationFeedManager);
        this.kafkaClient = kafkaClient;
        this.feedManager = notificationFeedManager;
        this.ack = KafkaPublisher.Ack.LEADER_RECEIVED;
        this.nbPartitions = cConfiguration.getInt("kafka.num.partitions");
        this.kafkaCallbacks = Maps.newHashMap();
    }

    protected void startUp() throws Exception {
        this.kafkaPublisher = this.kafkaClient.getPublisher(this.ack, Compression.SNAPPY);
        this.publishingExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("notification-publisher-%d")));
    }

    protected void shutDown() throws Exception {
        this.publishingExecutor.shutdownNow();
    }

    public <N> ListenableFuture<N> publish(final Id.NotificationFeed notificationFeed, final N n, final Type type) throws NotificationException {
        LOG.trace("Publishing on notification feed [{}]: {}", notificationFeed, n);
        return this.publishingExecutor.submit(new Callable<N>() { // from class: co.cask.cdap.notifications.service.kafka.KafkaNotificationService.1
            @Override // java.util.concurrent.Callable
            public N call() throws Exception {
                try {
                    KafkaMessage kafkaMessage = new KafkaMessage(KafkaNotificationUtils.getMessageKey(notificationFeed), KafkaNotificationService.GSON.toJsonTree(n, type));
                    ByteBuffer encode = KafkaMessageCodec.encode(kafkaMessage);
                    KafkaPublisher.Preparer prepare = KafkaNotificationService.this.kafkaPublisher.prepare(KafkaNotificationUtils.getKafkaTopicPartition(notificationFeed).getTopic());
                    prepare.add(encode, kafkaMessage.getMessageKey());
                    try {
                        prepare.send().get();
                        return (N) n;
                    } catch (ExecutionException e) {
                        throw new NotificationException(e.getCause());
                    }
                } catch (IOException e2) {
                    throw new NotificationException(e2);
                }
            }
        });
    }

    @Override // co.cask.cdap.notifications.service.AbstractNotificationService
    public <N> Cancellable subscribe(Id.NotificationFeed notificationFeed, NotificationHandler<N> notificationHandler, Executor executor) throws NotificationFeedNotFoundException, NotificationFeedException {
        Cancellable subscribe;
        this.feedManager.getFeed(notificationFeed);
        TopicPartition kafkaTopicPartition = KafkaNotificationUtils.getKafkaTopicPartition(notificationFeed);
        synchronized (this) {
            KafkaNotificationsCallback kafkaNotificationsCallback = this.kafkaCallbacks.get(kafkaTopicPartition);
            if (kafkaNotificationsCallback == null) {
                LOG.debug("Creating new Kafka notification callback for topic-partition {}", kafkaTopicPartition);
                kafkaNotificationsCallback = new KafkaNotificationsCallback(kafkaTopicPartition);
                this.kafkaCallbacks.put(kafkaTopicPartition, kafkaNotificationsCallback);
            }
            subscribe = kafkaNotificationsCallback.subscribe(notificationFeed, notificationHandler, executor);
        }
        return subscribe;
    }
}
