package com.github.davidmarquis.redisq.consumer;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/davidmarquis/redisq/consumer/MultiThreadingStrategy.class */
public class MultiThreadingStrategy implements ThreadingStrategy {
    private static final Logger log = LoggerFactory.getLogger(MultiThreadingStrategy.class);
    private static final long MAX_WAIT_MILLIS_WHEN_STOPPING_THREADS = 30000;
    private int numThreads;
    private List<DequeueThread> dequeueThreads;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/github/davidmarquis/redisq/consumer/MultiThreadingStrategy$DequeueThread.class */
    public class DequeueThread extends Thread {
        private boolean stopRequested = false;
        private Runnable callback;

        public DequeueThread(Runnable runnable) {
            this.callback = runnable;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stopRequested && !isInterrupted()) {
                try {
                    this.callback.run();
                } catch (Throwable th) {
                    MultiThreadingStrategy.log.error("Exception while handling next queue item.", th);
                }
            }
        }
    }

    public MultiThreadingStrategy(int i) {
        this.numThreads = i;
        this.dequeueThreads = new ArrayList(i);
    }

    @Override // com.github.davidmarquis.redisq.consumer.ThreadingStrategy
    public void start(String str, Runnable runnable) {
        for (int i = 0; i < this.numThreads; i++) {
            DequeueThread dequeueThread = new DequeueThread(runnable);
            dequeueThread.setName(String.format("redisq-consumer[%s]%s", str, Integer.valueOf(i)));
            dequeueThread.start();
            this.dequeueThreads.add(dequeueThread);
            log.debug(String.format("Started message consumer thread [%s]", dequeueThread.getName()));
        }
    }

    @Override // com.github.davidmarquis.redisq.consumer.ThreadingStrategy
    public void stop() {
        try {
            for (DequeueThread dequeueThread : this.dequeueThreads) {
                log.debug(String.format("Stopping message consuming thread [%s]", dequeueThread.getName()));
                dequeueThread.stopRequested = true;
            }
            waitForAllThreadsToTerminate();
        } finally {
            this.dequeueThreads.clear();
        }
    }

    private void waitForAllThreadsToTerminate() {
        for (DequeueThread dequeueThread : this.dequeueThreads) {
            try {
                dequeueThread.join(MAX_WAIT_MILLIS_WHEN_STOPPING_THREADS);
            } catch (InterruptedException e) {
                log.warn(String.format("Unable to join thread [%s].", dequeueThread.getName()));
            }
        }
    }
}
