package com.ning.billing.util.notificationq;

import com.ning.billing.util.Hostname;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.ning.billing.util.queue.PersistentQueueBase;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import net.sf.ehcache.constructs.CacheDecoratorFactory;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/util/notificationq/NotificationQueueDispatcher.class */
public class NotificationQueueDispatcher extends PersistentQueueBase {
    protected static final Logger log = LoggerFactory.getLogger(NotificationQueueDispatcher.class);
    public static final int CLAIM_TIME_MS = 300000;
    private static final String NOTIFICATION_THREAD_NAME = "Notification-queue-dispatch";
    private final NotificationQueueConfig config;
    private final String hostname;
    private final AtomicLong nbProcessedEvents;
    private final NotificationSqlDao dao;
    protected final InternalCallContextFactory internalCallContextFactory;
    protected final Clock clock;
    protected final Map<String, NotificationQueue> queues;
    private final Gauge pendingNotifications;
    private final Counter processedNotificationsSinceStart;
    private final Map<String, Histogram> perQueueProcessingTime;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NotificationQueueDispatcher(final Clock clock, NotificationQueueConfig notificationQueueConfig, IDBI idbi, InternalCallContextFactory internalCallContextFactory) {
        super("NotificationQ", Executors.newFixedThreadPool(1, new ThreadFactory() { // from class: com.ning.billing.util.notificationq.NotificationQueueDispatcher.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName(NotificationQueueDispatcher.NOTIFICATION_THREAD_NAME);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.ning.billing.util.notificationq.NotificationQueueDispatcher.1.1
                    @Override // java.lang.Thread.UncaughtExceptionHandler
                    public void uncaughtException(Thread thread2, Throwable th) {
                        NotificationQueueDispatcher.log.error("Uncaught exception for thread " + thread2.getName(), th);
                    }
                });
                return thread;
            }
        }), notificationQueueConfig.getNbThreads(), notificationQueueConfig);
        this.clock = clock;
        this.config = notificationQueueConfig;
        this.dao = idbi != null ? (NotificationSqlDao) idbi.onDemand(NotificationSqlDao.class) : null;
        this.internalCallContextFactory = internalCallContextFactory;
        this.hostname = Hostname.get();
        this.nbProcessedEvents = new AtomicLong();
        this.queues = new TreeMap();
        this.pendingNotifications = Metrics.newGauge(NotificationQueueDispatcher.class, "pending-notifications", new Gauge<Integer>() { // from class: com.ning.billing.util.notificationq.NotificationQueueDispatcher.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.yammer.metrics.core.Gauge
            public Integer value() {
                return Integer.valueOf(NotificationQueueDispatcher.this.dao != null ? NotificationQueueDispatcher.this.dao.getPendingCountNotifications(clock.getUTCNow().toDate(), NotificationQueueDispatcher.this.createCallContext(null, null)) : 0);
            }
        });
        this.processedNotificationsSinceStart = Metrics.newCounter(NotificationQueueDispatcher.class, "processed-notifications-since-start");
        this.perQueueProcessingTime = new HashMap();
    }

    @Override // com.ning.billing.util.queue.PersistentQueueBase, com.ning.billing.util.queue.QueueLifecycle
    public void stopQueue() {
        if (this.config.isProcessingOff() || !isStarted()) {
            return;
        }
        int i = 0;
        synchronized (this.queues) {
            Iterator<NotificationQueue> it = this.queues.values().iterator();
            while (it.hasNext()) {
                if (it.next().isStarted()) {
                    i++;
                }
            }
        }
        if (i == 0) {
            super.stopQueue();
        }
    }

    public AtomicLong getNbProcessedEvents() {
        return this.nbProcessedEvents;
    }

    public String getHostname() {
        return this.hostname;
    }

    public Clock getClock() {
        return this.clock;
    }

    protected NotificationQueueService.NotificationQueueHandler getHandlerForActiveQueue(String str) {
        synchronized (this.queues) {
            NotificationQueue notificationQueue = this.queues.get(str);
            if (notificationQueue == null || !notificationQueue.isStarted()) {
                return null;
            }
            return notificationQueue.getHandler();
        }
    }

    @Override // com.ning.billing.util.queue.PersistentQueueBase
    public int doProcessEvents() {
        return doProcessEventsWithLimit(-1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int doProcessEventsWithLimit(int i) {
        logDebug("ENTER doProcessEvents", new Object[0]);
        List<Notification> readyNotifications = getReadyNotifications(createCallContext(null, null));
        if (readyNotifications.size() == 0) {
            logDebug("EXIT doProcessEvents", new Object[0]);
            return 0;
        }
        logDebug("doProcessEventsWithLimit date = %s, got %s", getClock().getUTCNow().toDate(), Integer.valueOf(readyNotifications.size()));
        if (i > 0) {
            while (readyNotifications.size() > i) {
                readyNotifications.remove(readyNotifications.size() - 1);
            }
        }
        logDebug("START processing %d events at time %s", Integer.valueOf(readyNotifications.size()), getClock().getUTCNow().toDate());
        int i2 = 0;
        for (Notification notification : readyNotifications) {
            getNbProcessedEvents().incrementAndGet();
            NotificationKey notificationKey = (NotificationKey) deserializeEvent(notification.getNotificationKeyClass(), notification.getNotificationKey());
            NotificationQueueService.NotificationQueueHandler handlerForActiveQueue = getHandlerForActiveQueue(notification.getQueueName());
            if (handlerForActiveQueue != null) {
                handleNotificationWithMetrics(handlerForActiveQueue, notification, notificationKey);
                i2++;
                clearNotification(notification, createCallContext(notification.getTenantRecordId(), notification.getAccountRecordId()));
                logDebug("done handling notification %s, key = %s for time %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate());
            }
        }
        return i2;
    }

    private void handleNotificationWithMetrics(NotificationQueueService.NotificationQueueHandler notificationQueueHandler, Notification notification, NotificationKey notificationKey) {
        Histogram histogram;
        String[] split = notification.getQueueName().split(":");
        String str = split[0].substring(0, 3) + CacheDecoratorFactory.DASH + split[1] + "-process-time";
        synchronized (this.perQueueProcessingTime) {
            if (!this.perQueueProcessingTime.containsKey(notification.getQueueName())) {
                this.perQueueProcessingTime.put(notification.getQueueName(), Metrics.newHistogram((Class<?>) NotificationQueueDispatcher.class, str));
            }
            histogram = this.perQueueProcessingTime.get(notification.getQueueName());
        }
        DateTime uTCNow = this.clock.getUTCNow();
        notificationQueueHandler.handleReadyNotification(notificationKey, notification.getEffectiveDate(), notification.getFutureUserToken(), notification.getAccountRecordId(), notification.getTenantRecordId());
        histogram.update(this.clock.getUTCNow().getMillis() - uTCNow.getMillis());
        this.processedNotificationsSinceStart.inc();
    }

    private void clearNotification(Notification notification, InternalCallContext internalCallContext) {
        this.dao.clearNotification(notification.getId().toString(), getHostname(), internalCallContext);
    }

    private List<Notification> getReadyNotifications(InternalCallContext internalCallContext) {
        Date date = getClock().getUTCNow().toDate();
        Date date2 = getClock().getUTCNow().plus(300000L).toDate();
        List<Notification> readyNotifications = this.dao.getReadyNotifications(date, getHostname(), this.config.getPrefetchAmount(), internalCallContext);
        ArrayList<Notification> arrayList = new ArrayList();
        for (Notification notification : readyNotifications) {
            NotificationQueue notificationQueue = this.queues.get(notification.getQueueName());
            if (notificationQueue != null && notificationQueue.isStarted()) {
                logDebug("about to claim notification %s,  key = %s for time %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate());
                boolean z = this.dao.claimNotification(getHostname(), date2, notification.getId().toString(), date, internalCallContext) == 1;
                logDebug("claimed notification %s, key = %s for time %s result = %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate(), Boolean.valueOf(z));
                if (z) {
                    arrayList.add(notification);
                    this.dao.insertClaimedHistory(getHostname(), date, notification.getId().toString(), internalCallContext);
                }
            }
        }
        for (Notification notification2 : arrayList) {
            if (notification2.getOwner() != null && !notification2.getOwner().equals(getHostname())) {
                log.warn("NotificationQueue stealing notification {} from {}", notification2, notification2.getOwner());
            }
        }
        return arrayList;
    }

    private void logDebug(String str, Object... objArr) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Thread %d  %s", Long.valueOf(Thread.currentThread().getId()), String.format(str, objArr)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public InternalCallContext createCallContext(@Nullable Long l, @Nullable Long l2) {
        return this.internalCallContextFactory.createInternalCallContext(l, l2, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, (UUID) null);
    }

    public static String getCompositeName(String str, String str2) {
        return str + ":" + str2;
    }
}
