/*
 * Decompiled with CFR 0.152.
 */
package net.engio.mbassy.bus;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.AbstractPubSubSupport;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.bus.publication.ISyncAsyncPublicationCommand;

public abstract class AbstractSyncAsyncMessageBus<T, P extends ISyncAsyncPublicationCommand>
extends AbstractPubSubSupport<T>
implements IMessageBus<T, P> {
    private final ExecutorService executor;
    private final List<Thread> dispatchers;
    private final BlockingQueue<MessagePublication> pendingMessages;

    protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
        super(configuration);
        Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
        this.pendingMessages = asyncDispatch.getPendingMessages();
        this.dispatchers = new ArrayList<Thread>(asyncDispatch.getNumberOfMessageDispatchers());
        this.initDispatcherThreads(asyncDispatch);
        Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class);
        this.executor = asyncInvocation.getExecutor();
        this.getRuntime().add("handler.async.executor", this.executor);
    }

    private void initDispatcherThreads(Feature.AsynchronousMessageDispatch configuration) {
        for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); ++i) {
            Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        MessagePublication publication = null;
                        try {
                            publication = (MessagePublication)AbstractSyncAsyncMessageBus.this.pendingMessages.take();
                            publication.execute();
                            continue;
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                        catch (Throwable t) {
                            AbstractSyncAsyncMessageBus.this.handlePublicationError(new PublicationError(t, "Error in asynchronous dispatch", publication));
                            continue;
                        }
                        break;
                    }
                }
            });
            dispatcher.setName("Message dispatcher");
            this.dispatchers.add(dispatcher);
            dispatcher.start();
        }
    }

    protected MessagePublication addAsynchronousPublication(MessagePublication publication) {
        try {
            this.pendingMessages.put(publication);
            return publication.markScheduled();
        }
        catch (InterruptedException e) {
            this.handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
            return publication;
        }
    }

    protected MessagePublication addAsynchronousPublication(MessagePublication publication, long timeout, TimeUnit unit) {
        try {
            return this.pendingMessages.offer(publication, timeout, unit) ? publication.markScheduled() : publication;
        }
        catch (InterruptedException e) {
            this.handlePublicationError(new PublicationError(e, "Error while adding an asynchronous message publication", publication));
            return publication;
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        this.shutdown();
    }

    @Override
    public void shutdown() {
        for (Thread dispatcher : this.dispatchers) {
            dispatcher.interrupt();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    @Override
    public boolean hasPendingMessages() {
        return this.pendingMessages.size() > 0;
    }

    @Override
    public Executor getExecutor() {
        return this.executor;
    }
}

