package co.cask.cdap.internal.app.services;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.TxCallable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.retry.RetryableException;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.runtime.messaging.MultiThreadMessagingContext;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/services/AbstractNotificationSubscriberService.class */
public abstract class AbstractNotificationSubscriberService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationSubscriberService.class);
    private static final Logger SAMPLING_LOG = Loggers.sampling(LOG, LogSamplers.limitRate(10000));
    private static final Gson GSON = new Gson();
    private final CConfiguration cConf;
    private final Transactional transactional;
    private final MultiThreadMessagingContext messagingContext;
    private final MetricsCollectionService metricsCollectionService;
    private volatile boolean stopping;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:co/cask/cdap/internal/app/services/AbstractNotificationSubscriberService$AbstractSubscriberRunnable.class */
    public abstract class AbstractSubscriberRunnable implements Runnable {
        private final String topic;
        private final int fetchSize;
        private final boolean transactionalFetch;
        private final long emptyFetchDelayMillis;
        private final RetryStrategy retryStrategy;
        private final MetricsContext metricsContext;
        private int failureCount;
        private String messageId;

        /* JADX INFO: Access modifiers changed from: protected */
        public AbstractSubscriberRunnable(String str, String str2, long j, int i, boolean z) {
            this.topic = str2;
            this.fetchSize = i;
            this.transactionalFetch = z;
            this.emptyFetchDelayMillis = j;
            this.retryStrategy = RetryStrategies.fromConfiguration(AbstractNotificationSubscriberService.this.cConf, "system.notification.");
            this.metricsContext = AbstractNotificationSubscriberService.this.metricsCollectionService.getContext(ImmutableMap.of("cmp", "master.services", "ins", AbstractNotificationSubscriberService.this.cConf.get("messaging.container.instance.id", "0"), "ns", NamespaceId.SYSTEM.getNamespace(), "tpc", str2, "co", str));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final String getTopic() {
            return this.topic;
        }

        @Nullable
        protected abstract String initialize(DatasetContext datasetContext) throws RetryableException;

        protected abstract void processNotifications(DatasetContext datasetContext, NotificationIterator notificationIterator) throws Exception;

        protected abstract void persistMessageId(DatasetContext datasetContext, String str);

        @Override // java.lang.Runnable
        public void run() {
            this.messageId = doInitialize();
            while (!AbstractNotificationSubscriberService.this.stopping) {
                try {
                    long processNotifications = processNotifications();
                    if (!AbstractNotificationSubscriberService.this.stopping && processNotifications > 0) {
                        TimeUnit.MILLISECONDS.sleep(processNotifications);
                    }
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        @Nullable
        private String doInitialize() {
            return (String) Retries.supplyWithRetries(new Supplier<String>() { // from class: co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public String m270get() {
                    return (String) Transactionals.execute(AbstractNotificationSubscriberService.this.transactional, new TxCallable<String>() { // from class: co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable.1.1
                        /* renamed from: call, reason: merged with bridge method [inline-methods] */
                        public String m271call(DatasetContext datasetContext) throws Exception {
                            return AbstractSubscriberRunnable.this.initialize(datasetContext);
                        }
                    });
                }
            }, this.retryStrategy, Retries.ALWAYS_TRUE);
        }

        private long processNotifications() {
            try {
                Stopwatch start = new Stopwatch().start();
                final List<Message> fetchMessages = fetchMessages(this.messageId);
                this.metricsContext.gauge("tms.fetch.time.ms", start.elapsedTime(TimeUnit.MILLISECONDS));
                this.metricsContext.increment("tms.fetch.messages", fetchMessages.size());
                if (AbstractNotificationSubscriberService.this.stopping || fetchMessages.isEmpty()) {
                    return this.emptyFetchDelayMillis;
                }
                start.reset().start();
                NotificationIterator notificationIterator = (NotificationIterator) Transactionals.execute(AbstractNotificationSubscriberService.this.transactional, new TxCallable<NotificationIterator>() { // from class: co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable.2
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public NotificationIterator m272call(DatasetContext datasetContext) throws Exception {
                        NotificationIterator notificationIterator2 = new NotificationIterator(AbstractSubscriberRunnable.this.topic, fetchMessages.iterator());
                        AbstractSubscriberRunnable.this.processNotifications(datasetContext, notificationIterator2);
                        String lastMessageId = notificationIterator2.getLastMessageId();
                        if (lastMessageId != null) {
                            AbstractSubscriberRunnable.this.persistMessageId(datasetContext, lastMessageId);
                        }
                        return notificationIterator2;
                    }
                });
                this.messageId = notificationIterator.getLastMessageId() == null ? this.messageId : notificationIterator.getLastMessageId();
                this.metricsContext.gauge("process.duration.ms", start.elapsedTime(TimeUnit.MILLISECONDS));
                this.metricsContext.increment("process.notifications", notificationIterator.getConsumedCount());
                this.metricsContext.gauge("process.delay.ms", System.currentTimeMillis() - getMessagePublishTime(this.messageId));
                this.failureCount = 0;
                return 0L;
            } catch (Exception e) {
                AbstractNotificationSubscriberService.SAMPLING_LOG.warn("Failed to get and process notifications. Will retry in next run", e);
                RetryStrategy retryStrategy = this.retryStrategy;
                int i = this.failureCount + 1;
                this.failureCount = i;
                return retryStrategy.nextRetry(i, 0L);
            } catch (TopicNotFoundException e2) {
                AbstractNotificationSubscriberService.SAMPLING_LOG.warn("Failed to fetch from TMS. Will retry in next run.", e2);
                RetryStrategy retryStrategy2 = this.retryStrategy;
                int i2 = this.failureCount + 1;
                this.failureCount = i2;
                return retryStrategy2.nextRetry(i2, 0L);
            } catch (ServiceUnavailableException e3) {
                AbstractNotificationSubscriberService.SAMPLING_LOG.warn("Failed to contact service {}. Will retry in next run.", e3.getServiceName(), e3);
                RetryStrategy retryStrategy22 = this.retryStrategy;
                int i22 = this.failureCount + 1;
                this.failureCount = i22;
                return retryStrategy22.nextRetry(i22, 0L);
            }
        }

        private List<Message> fetchMessages(@Nullable final String str) throws TopicNotFoundException, IOException {
            return !this.transactionalFetch ? doFetchMessages(str) : (List) Transactionals.execute(AbstractNotificationSubscriberService.this.transactional, new TxCallable<List<Message>>() { // from class: co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService.AbstractSubscriberRunnable.3
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public List<Message> m273call(DatasetContext datasetContext) throws Exception {
                    return AbstractSubscriberRunnable.this.doFetchMessages(str);
                }
            }, TopicNotFoundException.class, IOException.class);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<Message> doFetchMessages(@Nullable String str) throws TopicNotFoundException, IOException {
            ArrayList arrayList = new ArrayList();
            AbstractNotificationSubscriberService.LOG.trace("Fetching system topic '{}' with messageId '{}'", str);
            CloseableIterator fetch = AbstractNotificationSubscriberService.this.messagingContext.getMessageFetcher().fetch(NamespaceId.SYSTEM.getNamespace(), this.topic, this.fetchSize, str);
            Throwable th = null;
            while (fetch.hasNext() && !AbstractNotificationSubscriberService.this.stopping) {
                try {
                    try {
                        arrayList.add(fetch.next());
                    } catch (Throwable th2) {
                        if (fetch != null) {
                            if (th != null) {
                                try {
                                    fetch.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        throw th2;
                    }
                } finally {
                }
            }
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            return arrayList;
        }

        private long getMessagePublishTime(@Nullable String str) {
            if (str == null) {
                return 0L;
            }
            return new MessageId(Bytes.fromHexString(str)).getPublishTimestamp();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:co/cask/cdap/internal/app/services/AbstractNotificationSubscriberService$NotificationIterator.class */
    public static final class NotificationIterator extends AbstractIterator<Notification> {
        private final String topic;
        private final Iterator<Message> messages;
        private String lastMessageId;
        private int consumedCount;

        NotificationIterator(String str, Iterator<Message> it) {
            this.topic = str;
            this.messages = it;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public Notification m274computeNext() {
            while (this.messages.hasNext()) {
                this.consumedCount++;
                Message next = this.messages.next();
                this.lastMessageId = next.getId();
                try {
                    Notification notification = (Notification) AbstractNotificationSubscriberService.GSON.fromJson(next.getPayloadAsString(), Notification.class);
                    AbstractNotificationSubscriberService.LOG.trace("Processing notification from topic {} with message id {}: {}", new Object[]{this.topic, this.lastMessageId, notification});
                    return notification;
                } catch (JsonSyntaxException e) {
                    AbstractNotificationSubscriberService.LOG.warn("Failed to decode message with id {} and payload '{}'. Skipped.", new Object[]{next.getId(), next.getPayloadAsString(), e});
                }
            }
            return (Notification) endOfData();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public String getLastMessageId() {
            return this.lastMessageId;
        }

        int getConsumedCount() {
            return this.consumedCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Inject
    public AbstractNotificationSubscriberService(MessagingService messagingService, CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, MetricsCollectionService metricsCollectionService) {
        this.cConf = cConfiguration;
        this.messagingContext = new MultiThreadMessagingContext(messagingService);
        this.metricsCollectionService = metricsCollectionService;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, ImmutableMap.of(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[]{this.messagingContext})), org.apache.tephra.RetryStrategies.retryOnConflict(20, 100L));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDown() {
        this.stopping = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessagingContext getMessagingContext() {
        return this.messagingContext;
    }
}
