package ru.infon.queuebox;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:ru/infon/queuebox/QueueEngine.class */
public class QueueEngine<T> implements QueuePacketHolder<T> {
    private static final Log LOG = LogFactory.getLog(QueueEngine.class);
    private final QueueBehave<T> queueBehave;
    private final ExecutorService executor;
    private final Properties properties;
    private Map<String, QueueConsumerThread<T>> listenerThreads = new ConcurrentHashMap();

    public QueueEngine(Properties properties, QueueBehave<T> queueBehave, ExecutorService executorService) {
        this.properties = properties;
        this.queueBehave = queueBehave;
        this.executor = executorService;
    }

    public void queue(MessageContainer<T> messageContainer) {
        this.queueBehave.put(messageContainer);
    }

    @Override // ru.infon.queuebox.QueuePacketHolder
    public int getFetchLimit() {
        return this.queueBehave.getFetchLimit();
    }

    @Override // ru.infon.queuebox.QueuePacketHolder
    public Collection<MessageContainer<T>> fetch(QueueConsumer<T> queueConsumer) {
        return this.queueBehave.find(queueConsumer);
    }

    @Override // ru.infon.queuebox.QueuePacketHolder
    public void ack(MessageContainer<T> messageContainer) {
        this.queueBehave.remove(messageContainer);
    }

    @Override // ru.infon.queuebox.QueuePacketHolder
    public void reset(MessageContainer<T> messageContainer) {
        this.queueBehave.reset(messageContainer);
    }

    public void registerConsumer(QueueConsumer<T> queueConsumer) {
        if (this.listenerThreads.containsKey(queueConsumer.getConsumerId())) {
            throw new IllegalStateException("consumer with id \"" + queueConsumer.getConsumerId() + "\" already registered");
        }
        QueueConsumerThread<T> queueConsumerThread = new QueueConsumerThread<>(this.properties, queueConsumer, this, this.executor);
        this.listenerThreads.put(queueConsumer.getConsumerId(), queueConsumerThread);
        queueConsumerThread.start();
    }
}
