package com.ning.billing.util.bus;

import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.bus.dao.BusEventEntry;
import com.ning.billing.util.bus.dao.PersistentBusSqlDao;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.events.BusInternalEvent;
import com.ning.billing.util.queue.PersistentQueueBase;
import com.ning.billing.util.svcsapi.bus.InternalBus;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionIsolationLevel;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/util/bus/PersistentInternalBus.class */
public class PersistentInternalBus extends PersistentQueueBase implements InternalBus {
    private static final long DELTA_IN_PROCESSING_TIME_MS = 300000;
    private static final Logger log = LoggerFactory.getLogger(PersistentInternalBus.class);
    private final PersistentBusSqlDao dao;
    private final EventBusDelegate eventBusDelegate;
    private final Clock clock;
    private final String hostname;
    private final InternalCallContextFactory internalCallContextFactory;
    private volatile boolean isStarted;

    /* loaded from: input_file:com/ning/billing/util/bus/PersistentInternalBus$EventBusDelegate.class */
    private static final class EventBusDelegate extends EventBus {
        public EventBusDelegate(String str) {
            super(str);
        }
    }

    @Inject
    public PersistentInternalBus(IDBI idbi, Clock clock, PersistentBusConfig persistentBusConfig, InternalCallContextFactory internalCallContextFactory) {
        super("Bus", Executors.newFixedThreadPool(persistentBusConfig.getNbThreads(), new ThreadFactory() { // from class: com.ning.billing.util.bus.PersistentInternalBus.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME), runnable, DefaultBusService.EVENT_BUS_TH_NAME);
            }
        }), persistentBusConfig.getNbThreads(), persistentBusConfig);
        this.dao = (PersistentBusSqlDao) idbi.onDemand(PersistentBusSqlDao.class);
        this.clock = clock;
        this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
        this.hostname = Hostname.get();
        this.internalCallContextFactory = internalCallContextFactory;
        this.isStarted = false;
    }

    @Override // com.ning.billing.util.svcsapi.bus.InternalBus
    public void start() {
        startQueue();
        this.isStarted = true;
    }

    @Override // com.ning.billing.util.svcsapi.bus.InternalBus
    public void stop() {
        stopQueue();
        this.isStarted = false;
    }

    @Override // com.ning.billing.util.queue.PersistentQueueBase
    public int doProcessEvents() {
        InternalCallContext createInternalCallContext = this.internalCallContextFactory.createInternalCallContext((Long) 0L, (Long) null, "PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM, (UUID) null);
        List<BusEventEntry> nextBusEvent = getNextBusEvent(createInternalCallContext);
        if (nextBusEvent.size() == 0) {
            return 0;
        }
        int i = 0;
        for (BusEventEntry busEventEntry : nextBusEvent) {
            i++;
            this.eventBusDelegate.post((BusInternalEvent) deserializeEvent(busEventEntry.getBusEventClass(), tweakJsonToIncludeAccountAndTenantRecordId(busEventEntry.getBusEventJson(), busEventEntry.getAccountRecordId(), busEventEntry.getTenantRecordId())));
            this.dao.clearBusEvent(Long.valueOf(busEventEntry.getId()), this.hostname, this.internalCallContextFactory.createInternalCallContext(busEventEntry.getTenantRecordId(), busEventEntry.getAccountRecordId(), createInternalCallContext));
        }
        return i;
    }

    @Override // com.ning.billing.util.queue.PersistentQueueBase, com.ning.billing.util.queue.QueueLifecycle
    public boolean isStarted() {
        return this.isStarted;
    }

    private List<BusEventEntry> getNextBusEvent(InternalCallContext internalCallContext) {
        Date date = this.clock.getUTCNow().toDate();
        Date date2 = this.clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
        List<BusEventEntry> nextBusEventEntries = this.dao.getNextBusEventEntries(this.config.getPrefetchAmount(), this.hostname, date, internalCallContext);
        LinkedList linkedList = new LinkedList();
        for (BusEventEntry busEventEntry : nextBusEventEntries) {
            InternalCallContext createInternalCallContext = this.internalCallContextFactory.createInternalCallContext(busEventEntry.getTenantRecordId(), busEventEntry.getAccountRecordId(), internalCallContext);
            if (this.dao.claimBusEvent(this.hostname, date2, Long.valueOf(busEventEntry.getId()), date, createInternalCallContext) == 1) {
                this.dao.insertClaimedHistory(this.hostname, date, busEventEntry.getId(), createInternalCallContext);
                linkedList.add(busEventEntry);
            }
        }
        return linkedList;
    }

    @Override // com.ning.billing.util.svcsapi.bus.InternalBus
    public void register(Object obj) throws InternalBus.EventBusException {
        this.eventBusDelegate.register(obj);
    }

    @Override // com.ning.billing.util.svcsapi.bus.InternalBus
    public void unregister(Object obj) throws InternalBus.EventBusException {
        this.eventBusDelegate.unregister(obj);
    }

    @Override // com.ning.billing.util.svcsapi.bus.InternalBus
    public void post(final BusInternalEvent busInternalEvent, final InternalCallContext internalCallContext) throws InternalBus.EventBusException {
        this.dao.inTransaction(TransactionIsolationLevel.READ_COMMITTED, new Transaction<Void, PersistentBusSqlDao>() { // from class: com.ning.billing.util.bus.PersistentInternalBus.2
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(PersistentBusSqlDao persistentBusSqlDao, TransactionStatus transactionStatus) throws Exception {
                PersistentInternalBus.this.postFromTransaction(busInternalEvent, internalCallContext, persistentBusSqlDao);
                return null;
            }
        });
    }

    @Override // com.ning.billing.util.svcsapi.bus.InternalBus
    public void postFromTransaction(BusInternalEvent busInternalEvent, EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, InternalCallContext internalCallContext) throws InternalBus.EventBusException {
        postFromTransaction(busInternalEvent, internalCallContext, (PersistentBusSqlDao) entitySqlDaoWrapperFactory.transmogrify(PersistentBusSqlDao.class));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void postFromTransaction(BusInternalEvent busInternalEvent, InternalCallContext internalCallContext, PersistentBusSqlDao persistentBusSqlDao) {
        try {
            persistentBusSqlDao.insertBusEvent(new BusEventEntry(this.hostname, busInternalEvent.getClass().getName(), objectMapper.writeValueAsString(busInternalEvent), internalCallContext.getUserToken(), internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()), internalCallContext);
        } catch (Exception e) {
            log.error("Failed to post BusEvent " + busInternalEvent, (Throwable) e);
        }
    }

    private String tweakJsonToIncludeAccountAndTenantRecordId(String str, Long l, Long l2) {
        return str.substring(0, str.lastIndexOf("}")) + ",\"accountRecordId\":" + l + ",\"tenantRecordId\":" + l2 + "}";
    }
}
