/*
 * Decompiled with CFR 0.152.
 */
package ru.infon.queuebox;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import net.c0f3.queuebox.QueueBoxContext;
import net.c0f3.queuebox.QueueStatistic;
import ru.infon.queuebox.MessageContainer;
import ru.infon.queuebox.QueueBehave;
import ru.infon.queuebox.QueueConsumer;
import ru.infon.queuebox.QueueEngine;
import ru.infon.queuebox.common.PropertiesBox;

public class QueueBox<T> {
    public static final int PRIORITY_HIGH = 1;
    public static final int PRIORITY_NORMAL = 4;
    public static final int PRIORITY_LOW = 10;
    public static final int PRIORITY_DEFAULT = 4;
    public static final String PROPERTY_FETCH_DELAY_MILLS = "queue.fetch.delay.mills";
    private QueueEngine<T> queue = null;
    private final QueueBoxContext queueBoxContext;
    protected QueueBehave<T> behave = null;
    protected ExecutorService executor = null;
    private final PropertiesBox properties;
    private final Class<T> packetClass;
    final AtomicBoolean started = new AtomicBoolean(false);

    public QueueBox(PropertiesBox properties, Class<T> packetCLass) {
        this.properties = properties;
        this.packetClass = packetCLass;
        this.queueBoxContext = new QueueBoxContext();
        this.queueBoxContext.setStatistic(QueueStatistic.voidInstance());
    }

    public QueueBox<T> withExecutorService(ExecutorService executor) {
        this.executor = executor;
        return this;
    }

    public QueueBox<T> withQueueBehave(QueueBehave<T> queueBehave) {
        this.behave = queueBehave;
        this.behave.setContext(this.queueBoxContext);
        return this;
    }

    public QueueBox<T> withStatistic(QueueStatistic statistic) {
        this.queueBoxContext.setStatistic(statistic);
        return this;
    }

    public QueueStatistic getStatistic() {
        return this.queueBoxContext.getStatistic();
    }

    public PropertiesBox getProperties() {
        return this.properties;
    }

    public void start() {
        Objects.requireNonNull(this.behave);
        Objects.requireNonNull(this.executor);
        if (this.queue == null) {
            this.queue = new QueueEngine<T>(this.properties, this.behave, this.executor);
        }
        this.started.set(true);
    }

    public void stop() {
        this.queue.shutdown();
    }

    public void subscribe(QueueConsumer<T> consumer) {
        if (!this.started.get()) {
            throw new IllegalStateException("QueueBox not started");
        }
        this.executor.submit(() -> this.queue.registerConsumer(consumer));
    }

    public void subscribe(final String destination, final Consumer<T> consumer) {
        this.subscribe(new QueueConsumer<T>(){

            @Override
            public void onPacket(MessageContainer<T> message) {
                consumer.accept(message.getMessage());
            }

            @Override
            public String getConsumerId() {
                return destination;
            }
        });
    }

    public Future<T> queue(T message) {
        if (!this.started.get()) {
            throw new IllegalStateException("QueueBox not started");
        }
        return this.executor.submit(() -> {
            this.queue.queue(new MessageContainer<Object>(message));
            return message;
        });
    }

    public Future<T> queue(T message, int priority) {
        if (!this.started.get()) {
            throw new IllegalStateException("QueueBox not started");
        }
        return this.executor.submit(() -> {
            MessageContainer<Object> messageContainer = new MessageContainer<Object>(message);
            messageContainer.setPriority(priority);
            this.queue.queue(messageContainer);
            return message;
        });
    }
}

