/*
 * Decompiled with CFR 0.152.
 */
package de.fraunhofer.iosb.ilt.frostserver.messagebus;

import de.fraunhofer.iosb.ilt.frostserver.messagebus.MessageBus;
import de.fraunhofer.iosb.ilt.frostserver.messagebus.MessageListener;
import de.fraunhofer.iosb.ilt.frostserver.model.EntityChangedMessage;
import de.fraunhofer.iosb.ilt.frostserver.settings.BusSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.ConfigDefaults;
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.Settings;
import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValueInt;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalMessageBus
implements MessageBus,
ConfigDefaults {
    @DefaultValueInt(value=2)
    public static final String TAG_WORKER_COUNT = "workerPoolSize";
    @DefaultValueInt(value=100)
    public static final String TAG_QUEUE_SIZE = "queueSize";
    private static final Logger LOGGER = LoggerFactory.getLogger(InternalMessageBus.class);
    private BlockingQueue<EntityChangedMessage> entityChangedMessageQueue;
    private ExecutorService entityChangedExecutorService;
    private final List<MessageListener> listeners = new CopyOnWriteArrayList<MessageListener>();
    private int poolSize;
    private int queueSize;

    @Override
    public void init(CoreSettings settings) {
        BusSettings busSettings = settings.getBusSettings();
        Settings customSettings = busSettings.getCustomSettings();
        this.poolSize = customSettings.getInt(TAG_WORKER_COUNT, this.defaultValueInt(TAG_WORKER_COUNT));
        this.queueSize = customSettings.getInt(TAG_QUEUE_SIZE, this.defaultValueInt(TAG_QUEUE_SIZE));
        this.entityChangedMessageQueue = new ArrayBlockingQueue<EntityChangedMessage>(this.queueSize);
        this.entityChangedExecutorService = ProcessorHelper.createProcessors(this.poolSize, this.entityChangedMessageQueue, this::handleMessage, "IntBusPrc");
    }

    @Override
    public void stop() {
        this.entityChangedExecutorService.shutdown();
        try {
            if (this.entityChangedExecutorService.awaitTermination(2L, TimeUnit.SECONDS)) {
                return;
            }
        }
        catch (InterruptedException ex) {
            LOGGER.error("Interrupted while waiting for shutdown.", ex);
            Thread.currentThread().interrupt();
        }
        List<Runnable> list = this.entityChangedExecutorService.shutdownNow();
        LOGGER.warn("There were {} messages left on the queue.", (Object)list.size());
    }

    @Override
    public void sendMessage(EntityChangedMessage message) {
        if (this.listeners.isEmpty()) {
            return;
        }
        if (!this.entityChangedMessageQueue.offer(message)) {
            LOGGER.error("Failed to add message to message bus. Increase {}{} (currently {}) to allow a bigger buffer, or increase {}{} (currently {}) to empty the buffer quicker.", "bus.", TAG_QUEUE_SIZE, this.queueSize, "bus.", TAG_WORKER_COUNT, this.poolSize);
        }
    }

    @Override
    public void addMessageListener(MessageListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeMessageListener(MessageListener listener) {
        this.listeners.remove(listener);
    }

    private void handleMessage(EntityChangedMessage message) {
        for (MessageListener listener : this.listeners) {
            try {
                listener.messageReceived(message);
            }
            catch (Exception ex) {
                LOGGER.error("Listener threw exception on message reception.", ex);
            }
        }
    }
}

