package org.aanguita.jacuzzi.queues.processor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.aanguita.jacuzzi.concurrency.SimpleSemaphore;
import org.aanguita.jacuzzi.concurrency.ThreadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aanguita/jacuzzi/queues/processor/MessageProcessor.class */
public class MessageProcessor<E> {
    private Logger logger;
    private static final int DEFAULT_QUEUE_CAPACITY = 1024;
    private final String name;
    private static final boolean MESSAGE_FAIRNESS = true;
    private final ArrayBlockingQueue<E> messageQueue;
    private final MessageHandlerThread messageHandlerThread;
    private final MessageReaderThread messageReaderThread;
    private final MessageReaderHandlerThread messageReaderHandlerThread;
    private final boolean separateThreads;
    private final SimpleSemaphore simpleSemaphore;
    private final AtomicBoolean alive;

    public MessageProcessor(MessageReader<E> messageReader) throws IllegalArgumentException {
        this(ThreadUtil.invokerName(MESSAGE_FAIRNESS), messageReader, null, DEFAULT_QUEUE_CAPACITY, true);
    }

    public MessageProcessor(String str, MessageReader<E> messageReader) throws IllegalArgumentException {
        this(str, messageReader, null, DEFAULT_QUEUE_CAPACITY, true);
    }

    public MessageProcessor(String str, MessageReader<E> messageReader, int i) throws IllegalArgumentException {
        this(str, messageReader, null, i, true);
    }

    public MessageProcessor(MessageHandler<E> messageHandler) throws IllegalArgumentException {
        this(ThreadUtil.invokerName(MESSAGE_FAIRNESS), null, messageHandler, DEFAULT_QUEUE_CAPACITY, true);
    }

    public MessageProcessor(String str, MessageHandler<E> messageHandler) throws IllegalArgumentException {
        this(str, null, messageHandler, DEFAULT_QUEUE_CAPACITY, true);
    }

    public MessageProcessor(String str, MessageHandler<E> messageHandler, int i) throws IllegalArgumentException {
        this(str, null, messageHandler, i, true);
    }

    public MessageProcessor(MessageReader<E> messageReader, MessageHandler<E> messageHandler, boolean z) throws IllegalArgumentException {
        this(ThreadUtil.invokerName(MESSAGE_FAIRNESS), messageReader, messageHandler, DEFAULT_QUEUE_CAPACITY, z);
    }

    public MessageProcessor(String str, MessageReader<E> messageReader, MessageHandler<E> messageHandler, boolean z) throws IllegalArgumentException {
        this(str, messageReader, messageHandler, DEFAULT_QUEUE_CAPACITY, z);
    }

    public MessageProcessor(String str, MessageReader<E> messageReader, MessageHandler<E> messageHandler, int i, boolean z) throws IllegalArgumentException {
        this.logger = LoggerFactory.getLogger(MessageProcessor.class);
        if (str == null) {
            throw new IllegalArgumentException("name must not be null");
        }
        if (z && messageReader == null && messageHandler == null) {
            throw new IllegalArgumentException("Either MessageReader of MessageHandler objects must be received, both null");
        }
        if (!z && (messageReader == null || messageHandler == null)) {
            throw new IllegalArgumentException("Both MessageReader and MessageHandler objects must be received if no separate threads are employed");
        }
        this.name = str;
        this.messageQueue = initializeMessageQueue(z, i);
        this.messageReaderThread = initializeMessageReaderThread(messageReader, z, str);
        this.messageHandlerThread = initializeMessageHandlerThread(messageHandler, z, str);
        this.messageReaderHandlerThread = initializeMessageReaderHandlerThread(messageReader, messageHandler, z, str);
        this.separateThreads = z;
        this.simpleSemaphore = new SimpleSemaphore();
        this.alive = new AtomicBoolean(true);
        this.logger.debug(logInit() + ") initialized");
    }

    private ArrayBlockingQueue<E> initializeMessageQueue(boolean z, int i) {
        if (z) {
            return new ArrayBlockingQueue<>(i, true);
        }
        return null;
    }

    private MessageReaderThread initializeMessageReaderThread(MessageReader messageReader, boolean z, String str) {
        if (messageReader == null || !z) {
            return null;
        }
        return new MessageReaderThread(str, this, messageReader);
    }

    private MessageHandlerThread initializeMessageHandlerThread(MessageHandler messageHandler, boolean z, String str) {
        if (messageHandler == null || !z) {
            return null;
        }
        return new MessageHandlerThread(str, this, messageHandler);
    }

    private MessageReaderHandlerThread initializeMessageReaderHandlerThread(MessageReader messageReader, MessageHandler messageHandler, boolean z, String str) {
        if (messageReader == null || messageHandler == null || z) {
            return null;
        }
        return new MessageReaderHandlerThread(str, this, messageReader, messageHandler);
    }

    public void start() {
        if (this.separateThreads) {
            startThread(this.messageReaderThread);
            startThread(this.messageHandlerThread);
        } else {
            startThread(this.messageReaderHandlerThread);
        }
        this.logger.debug(logInit() + ") started");
    }

    private synchronized void startThread(Thread thread) {
        if (thread == null || thread.isAlive()) {
            return;
        }
        thread.start();
    }

    public void pause() {
        if (this.alive.get()) {
            this.simpleSemaphore.pause();
            this.logger.debug(logInit() + ") paused");
        }
    }

    public void resume() {
        this.simpleSemaphore.resume();
        this.logger.debug(logInit() + ") resumed");
    }

    public void pauseReader() {
        pause();
    }

    public void resumeReader() {
        resume();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean accessTrafficControl() {
        this.simpleSemaphore.access();
        return this.alive.get();
    }

    public void pauseHandler() {
        pause();
    }

    public void resumeHandler() {
        resume();
    }

    public void addMessage(E e) throws InterruptedException {
        if (this.messageQueue == null) {
            throw new IllegalStateException("Tried to add a message with no queue configured");
        }
        this.logger.debug(logInit() + ") added message");
        this.messageQueue.put(e);
    }

    public E takeMessage() throws InterruptedException {
        if (this.messageQueue == null) {
            throw new IllegalStateException("Tried to retrieve a message with no queue configured");
        }
        this.logger.debug(logInit() + ") removed message");
        return this.messageQueue.take();
    }

    public int queueSize() {
        if (this.messageQueue != null) {
            return this.messageQueue.size();
        }
        return 0;
    }

    public synchronized void stop() {
        if (this.alive.getAndSet(false)) {
            resume();
            if (this.messageReaderThread != null) {
                this.messageReaderThread.getMessageReader().stop();
            }
            if (this.messageHandlerThread != null) {
                this.messageHandlerThread.interrupt();
            }
            if (this.messageReaderHandlerThread != null) {
                this.messageReaderHandlerThread.getMessageReader().stop();
                this.messageReaderHandlerThread.interrupt();
            }
            this.logger.debug(logInit() + ") stopped");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void readerHasStopped() {
        if (this.alive.getAndSet(false)) {
            resume();
            if (this.messageHandlerThread != null) {
                this.messageHandlerThread.interrupt();
            }
        }
    }

    protected synchronized void finalize() throws Throwable {
        super.finalize();
        if (this.alive.getAndSet(false)) {
            stop();
        }
    }

    private String logInit() {
        return "Message processor (" + this.name;
    }
}
