package com.ning.billing.util.queue;

import com.ning.billing.util.config.PersistentQueueConfig;
import com.ning.billing.util.jackson.ObjectMapper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/billing/util/queue/PersistentQueueBase.class */
public abstract class PersistentQueueBase implements QueueLifecycle {
    private static final long waitTimeoutMs = 15000;
    private final int nbThreads;
    private final Executor executor;
    private final String svcQName;
    protected final PersistentQueueConfig config;
    private static final Logger log = LoggerFactory.getLogger(PersistentQueueBase.class);
    protected static final ObjectMapper objectMapper = new ObjectMapper();
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private boolean isProcessingEvents = false;
    private int curActiveThreads = 0;
    private final AtomicBoolean isProcessingSuspended = new AtomicBoolean(false);

    public PersistentQueueBase(String str, Executor executor, int i, PersistentQueueConfig persistentQueueConfig) {
        this.executor = executor;
        this.nbThreads = i;
        this.svcQName = str;
        this.config = persistentQueueConfig;
    }

    @Override // com.ning.billing.util.queue.QueueLifecycle
    public void startQueue() {
        if (this.config.isProcessingOff() || !this.isStarted.compareAndSet(false, true)) {
            return;
        }
        this.isProcessingEvents = true;
        this.curActiveThreads = 0;
        final CountDownLatch countDownLatch = new CountDownLatch(this.nbThreads);
        log.info(String.format("%s: Starting with %d threads", this.svcQName, Integer.valueOf(this.nbThreads)));
        for (int i = 0; i < this.nbThreads; i++) {
            this.executor.execute(new Runnable() { // from class: com.ning.billing.util.queue.PersistentQueueBase.1
                @Override // java.lang.Runnable
                public void run() {
                    PersistentQueueBase.log.info(String.format("%s: Thread %s [%d] starting", PersistentQueueBase.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                    synchronized (this) {
                        PersistentQueueBase.access$208(PersistentQueueBase.this);
                    }
                    countDownLatch.countDown();
                    while (PersistentQueueBase.this.isProcessingEvents) {
                        try {
                            try {
                                try {
                                    if (!PersistentQueueBase.this.isProcessingSuspended.get()) {
                                        PersistentQueueBase.this.doProcessEvents();
                                    }
                                } catch (Exception e) {
                                    PersistentQueueBase.log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...", PersistentQueueBase.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())), e);
                                }
                                sleepALittle();
                            } catch (Throwable th) {
                                PersistentQueueBase.log.info(String.format("%s: Thread %s has exited", PersistentQueueBase.this.svcQName, Thread.currentThread().getName()));
                                synchronized (this) {
                                    PersistentQueueBase.access$210(PersistentQueueBase.this);
                                    this.notify();
                                    throw th;
                                }
                            }
                        } catch (InterruptedException e2) {
                            PersistentQueueBase.log.info(String.format("%s: Thread %s got interrupted, exting... ", PersistentQueueBase.this.svcQName, Thread.currentThread().getName()));
                            PersistentQueueBase.log.info(String.format("%s: Thread %s has exited", PersistentQueueBase.this.svcQName, Thread.currentThread().getName()));
                            synchronized (this) {
                                PersistentQueueBase.access$210(PersistentQueueBase.this);
                                this.notify();
                                return;
                            }
                        } catch (Throwable th2) {
                            PersistentQueueBase.log.error(String.format("%s: Thread %s got an exception, exting... ", PersistentQueueBase.this.svcQName, Thread.currentThread().getName()), th2);
                            PersistentQueueBase.log.info(String.format("%s: Thread %s has exited", PersistentQueueBase.this.svcQName, Thread.currentThread().getName()));
                            synchronized (this) {
                                PersistentQueueBase.access$210(PersistentQueueBase.this);
                                this.notify();
                                return;
                            }
                        }
                    }
                    PersistentQueueBase.log.info(String.format("%s: Thread %s has exited", PersistentQueueBase.this.svcQName, Thread.currentThread().getName()));
                    synchronized (this) {
                        PersistentQueueBase.access$210(PersistentQueueBase.this);
                        this.notify();
                    }
                }

                private void sleepALittle() throws InterruptedException {
                    Thread.sleep(PersistentQueueBase.this.config.getSleepTimeMs());
                }
            });
        }
        try {
            if (countDownLatch.await(waitTimeoutMs, TimeUnit.MILLISECONDS)) {
                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", this.svcQName, Long.valueOf(this.nbThreads - countDownLatch.getCount()), Integer.valueOf(this.nbThreads)));
            } else {
                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", this.svcQName, Long.valueOf(this.nbThreads - countDownLatch.getCount()), Integer.valueOf(this.nbThreads)));
            }
        } catch (InterruptedException e) {
            log.warn(String.format("%s: Start sequence, got interrupted", this.svcQName));
        }
    }

    @Override // com.ning.billing.util.queue.QueueLifecycle
    public void stopQueue() {
        int i;
        if (this.config.isProcessingOff() || !this.isStarted.compareAndSet(true, false)) {
            return;
        }
        try {
            try {
                synchronized (this) {
                    this.isProcessingEvents = false;
                    long currentTimeMillis = System.currentTimeMillis();
                    for (long j = 15000; this.curActiveThreads > 0 && j > 0; j = waitTimeoutMs - (System.currentTimeMillis() - currentTimeMillis)) {
                        wait(1000L);
                    }
                    i = this.curActiveThreads;
                }
                if (i > 0) {
                    log.error(String.format("%s: Stop sequence completed with %d active remaing threads", this.svcQName, Integer.valueOf(this.curActiveThreads)));
                } else {
                    log.info(String.format("%s: Stop sequence completed with %d active remaing threads", this.svcQName, Integer.valueOf(this.curActiveThreads)));
                }
                this.curActiveThreads = 0;
            } catch (InterruptedException e) {
                log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", this.svcQName, Integer.valueOf(this.curActiveThreads)));
                if (0 > 0) {
                    log.error(String.format("%s: Stop sequence completed with %d active remaing threads", this.svcQName, Integer.valueOf(this.curActiveThreads)));
                } else {
                    log.info(String.format("%s: Stop sequence completed with %d active remaing threads", this.svcQName, Integer.valueOf(this.curActiveThreads)));
                }
                this.curActiveThreads = 0;
            }
        } catch (Throwable th) {
            if (0 > 0) {
                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", this.svcQName, Integer.valueOf(this.curActiveThreads)));
            } else {
                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", this.svcQName, Integer.valueOf(this.curActiveThreads)));
            }
            this.curActiveThreads = 0;
            throw th;
        }
    }

    @Managed(description = "suspend processing for all notifications")
    public void suspendNotificationProcessing() {
        this.isProcessingSuspended.set(true);
    }

    @Managed(description = "resume processing for all notifications")
    public void resumeNotificationProcessing() {
        this.isProcessingSuspended.set(false);
    }

    @Managed(description = "check whether notification processing is suspended")
    public boolean isNotificationProcessingSuspended() {
        return this.isProcessingSuspended.get();
    }

    public static <T> T deserializeEvent(String str, String str2) {
        try {
            return (T) objectMapper.readValue(str2, Class.forName(str));
        } catch (Exception e) {
            log.error(String.format("Failed to deserialize json object %s for class %s", str2, str), e);
            return null;
        }
    }

    public abstract int doProcessEvents();

    @Override // com.ning.billing.util.queue.QueueLifecycle
    public boolean isStarted() {
        return this.isStarted.get();
    }

    static /* synthetic */ int access$208(PersistentQueueBase persistentQueueBase) {
        int i = persistentQueueBase.curActiveThreads;
        persistentQueueBase.curActiveThreads = i + 1;
        return i;
    }

    static /* synthetic */ int access$210(PersistentQueueBase persistentQueueBase) {
        int i = persistentQueueBase.curActiveThreads;
        persistentQueueBase.curActiveThreads = i - 1;
        return i;
    }
}
