/*
 * Decompiled with CFR 0.152.
 */
package ru.infon.queuebox;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import ru.infon.queuebox.MessageContainer;
import ru.infon.queuebox.QueueBehave;
import ru.infon.queuebox.QueueConsumer;
import ru.infon.queuebox.QueueConsumerThread;
import ru.infon.queuebox.QueuePacketHolder;

public class QueueEngine<T>
implements QueuePacketHolder<T> {
    private static final Log LOG = LogFactory.getLog(QueueEngine.class);
    private final QueueBehave<T> queueBehave;
    private final ExecutorService executor;
    private final Properties properties;
    private Map<String, QueueConsumerThread<T>> listenerThreads = new ConcurrentHashMap<String, QueueConsumerThread<T>>();

    public QueueEngine(Properties properties, QueueBehave<T> queueBehave, ExecutorService executor) {
        this.properties = properties;
        this.queueBehave = queueBehave;
        this.executor = executor;
    }

    public void queue(MessageContainer<T> event) {
        this.queueBehave.put(event);
    }

    @Override
    public int getFetchLimit() {
        return this.queueBehave.getFetchLimit();
    }

    @Override
    public Collection<MessageContainer<T>> fetch(QueueConsumer<T> consumer) {
        return this.queueBehave.find(consumer);
    }

    @Override
    public void ack(MessageContainer<T> packet) {
        this.queueBehave.remove(packet);
    }

    @Override
    public void reset(MessageContainer<T> packet) {
        this.queueBehave.reset(packet);
    }

    public void registerConsumer(QueueConsumer<T> consumer) {
        if (this.listenerThreads.containsKey(consumer.getConsumerId())) {
            throw new IllegalStateException("consumer with id \"" + consumer.getConsumerId() + "\" already registered");
        }
        QueueConsumerThread<T> consumerThread = new QueueConsumerThread<T>(this.properties, consumer, this, this.executor);
        this.listenerThreads.put(consumer.getConsumerId(), consumerThread);
        consumerThread.start();
    }

    public void shutdown() {
        this.listenerThreads.values().forEach(QueueConsumerThread::stop);
    }
}

