/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.query.MorphiumIterator;
import de.caluga.morphium.query.Query;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Messaging
extends Thread
implements ShutdownListener {
    private static Logger log = LoggerFactory.getLogger(Messaging.class);
    private Morphium morphium;
    private boolean running;
    private int pause;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private boolean processMultiple;
    private List<MessageListener> listeners;
    private Map<String, Long> pauseMessages = new ConcurrentHashMap<String, Long>();
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private ThreadPoolExecutor threadPool;
    private ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded;
    private int windowSize;
    private boolean useChangeStream;
    private ChangeStreamMonitor changeStreamMonitor;
    private Map<MorphiumId, Msg> waitingForAnswers = new ConcurrentHashMap<MorphiumId, Msg>();
    private Map<MorphiumId, Msg> waitingForMessages = new ConcurrentHashMap<MorphiumId, Msg>();
    private List<MorphiumId> processing = new Vector<MorphiumId>();

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
    }

    public Messaging(Morphium m) {
        this(m, null, 500, false, false, 100);
    }

    public Messaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, processMultiple, multithreadded, windowSize);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, processMultiple, false, 1000);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, m.isReplicaSet());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this.multithreadded = multithreadded;
        this.windowSize = windowSize;
        this.morphium = m;
        this.useChangeStream = useChangeStream;
        if (multithreadded) {
            LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(){

                @Override
                public boolean offer(Runnable e) {
                    int maximumPoolSize;
                    int poolSize = Messaging.this.threadPool.getPoolSize();
                    if (poolSize >= (maximumPoolSize = Messaging.this.threadPool.getMaximumPoolSize()) || poolSize > Messaging.this.threadPool.getActiveCount()) {
                        return super.offer(e);
                    }
                    return false;
                }
            };
            this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue);
            this.threadPool.setRejectedExecutionHandler((r, executor) -> {
                try {
                    executor.getQueue().put(r);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            this.threadPool.setThreadFactory(new ThreadFactory(){
                private AtomicInteger num = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread ret = new Thread(r, "messaging " + this.num);
                    this.num.set(this.num.get() + 1);
                    ret.setDaemon(true);
                    return ret;
                }
            });
        }
        this.decouplePool = new ScheduledThreadPoolExecutor(1);
        this.decouplePool.setThreadFactory(new ThreadFactory(){
            private AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "decouple_thr_" + this.num);
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
        this.morphium.addShutdownListener(this);
        this.queueName = queueName;
        this.running = true;
        this.pause = pause;
        this.processMultiple = processMultiple;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        m.ensureIndicesFor(Msg.class, this.getCollectionName());
        this.listeners = new CopyOnWriteArrayList<MessageListener>();
        this.listenerByName = new HashMap<String, List<MessageListener>>();
    }

    public long getMessageCount() {
        return this.morphium.createQueryFor(Msg.class).setCollectionName(this.getCollectionName()).countAll();
    }

    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    public List<Msg> findMessages(Query<Msg> q) {
        try {
            q = q.clone();
        }
        catch (CloneNotSupportedException cloneNotSupportedException) {
            // empty catch block
        }
        q.setCollectionName(this.getCollectionName());
        return q.asList();
    }

    @Override
    public void run() {
        this.setName("Msg " + this.id);
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " started");
        }
        if (this.useChangeStream) {
            log.debug("Before running the changestream monitor - check of already existing messages");
            try {
                this.findAndProcessPendingMessages(null);
                if (this.multithreadded) {
                    while (this.threadPool.getActiveCount() > 0) {
                        Thread.yield();
                    }
                }
            }
            catch (Exception e) {
                log.error("Error processing existing messages in queue", (Throwable)e);
            }
            log.debug("init Messaging  using changestream monitor");
            this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, this.getCollectionName(), true);
            this.changeStreamMonitor.addListener(evt -> {
                block27: {
                    try {
                        if (evt == null || evt.getOperationType() == null) {
                            return this.running;
                        }
                        if (evt.getOperationType().equals("insert")) {
                            Msg obj = this.morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
                            if (obj.getInAnswerTo() != null && this.waitingForMessages.containsKey(obj.getInAnswerTo())) {
                                if (log.isDebugEnabled()) {
                                    log.debug("processing answer " + obj.getMsgId() + " in answer to " + obj.getInAnswerTo());
                                }
                                ArrayList<Msg> lst = new ArrayList<Msg>();
                                lst.add(obj);
                                try {
                                    this.processMessages(lst);
                                }
                                catch (Exception e) {
                                    log.error("Error during message processing ", (Throwable)e);
                                }
                                return this.running;
                            }
                            if (this.listenerByName.get(obj.getName()) == null && this.listeners.size() == 0) {
                                return this.running;
                            }
                            if (obj.getSender().equals(this.id) || obj.getProcessedBy() != null && obj.getProcessedBy().contains(this.id) || obj.getRecipient() != null && !obj.getRecipient().equals(this.id)) {
                                return this.running;
                            }
                            if (this.pauseMessages.containsKey(obj.getName())) {
                                if (log.isDebugEnabled()) {
                                    log.debug("Not processing message - processing paused for " + obj.getName());
                                }
                                return this.running;
                            }
                            if (!(!obj.isExclusive() || obj.getLockedBy() != null || obj.getRecipient() != null && !obj.getRecipient().equals(this.id) || obj.getProcessedBy() != null && obj.getProcessedBy().contains(this.id))) {
                                this.lockAndProcess(obj);
                            } else if (!obj.isExclusive() || obj.getRecipient() != null && obj.getRecipient().equals(this.id)) {
                                if (this.processing.contains(obj.getMsgId())) {
                                    return this.running;
                                }
                                ArrayList<Msg> lst = new ArrayList<Msg>();
                                lst.add(obj);
                                try {
                                    this.processMessages(lst);
                                }
                                catch (Exception e) {
                                    log.error("Error during message processing ", (Throwable)e);
                                }
                            } else {
                                log.debug("Message is not for me");
                            }
                            break block27;
                        }
                        if (!evt.getOperationType().equals("update") || evt.getFullDocument() == null || evt.getFullDocument().get("_id") == null) break block27;
                        Msg obj = this.morphium.findById(Msg.class, new MorphiumId(evt.getFullDocument().get("_id").toString()), this.getCollectionName());
                        if (obj == null) {
                            return this.running;
                        }
                        if (obj.getInAnswerTo() != null && this.waitingForMessages.containsKey(obj.getInAnswerTo())) {
                            if (obj.isExclusive()) {
                                this.lockAndProcess(obj);
                            } else {
                                ArrayList<Msg> lst = new ArrayList<Msg>();
                                lst.add(obj);
                                try {
                                    this.processMessages(lst);
                                }
                                catch (Exception e) {
                                    log.error("Error during message processing ", (Throwable)e);
                                }
                            }
                        }
                        if (!(this.listenerByName.get(obj.getName()) != null || this.listeners.size() != 0 || obj.getInAnswerTo() != null && this.waitingForMessages.containsKey(obj.getInAnswerTo()))) {
                            return this.running;
                        }
                        if (obj != null && obj.isExclusive() && obj.getLockedBy() == null && !this.pauseMessages.containsKey(obj.getName()) && (obj.getRecipient() == null || obj.getRecipient().equals(this.id))) {
                            log.debug("Update of msg - trying to lock");
                            this.lockAndProcess(obj);
                        }
                    }
                    catch (Exception e) {
                        log.error("Error during event processing in changestream", (Throwable)e);
                    }
                }
                return this.running;
            });
            this.changeStreamMonitor.start();
        } else {
            while (this.running) {
                try {
                    this.findAndProcessMessages(this.processMultiple);
                }
                catch (Throwable e) {
                    log.error("Unhandled exception " + e.getMessage(), e);
                }
                finally {
                    try {
                        Messaging.sleep(this.pause);
                    }
                    catch (InterruptedException e) {}
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("Messaging " + this.id + " stopped!");
            }
            if (!this.running) {
                this.listeners.clear();
                this.listenerByName.clear();
            }
        }
    }

    public void pauseProcessingOfMessagesNamed(String name) {
        this.pauseMessages.putIfAbsent(name, System.currentTimeMillis());
    }

    public Long unpauseProcessingOfMessagesNamed(String name) {
        MorphiumIterator<Msg> messages = this.findMessages(name, this.processMultiple);
        this.processMessages(messages);
        Long ret = this.pauseMessages.remove(name);
        if (ret != null) {
            ret = System.currentTimeMillis() - ret;
        }
        return ret;
    }

    public void findAndProcessPendingMessages(String name) {
        Runnable r = () -> {
            MorphiumIterator<Msg> messages;
            while ((messages = this.findMessages(name, this.processMultiple)).hasNext()) {
                this.processMessages(messages);
                try {
                    Thread.sleep(this.pause);
                }
                catch (InterruptedException interruptedException) {}
            }
        };
        this.queueOrRun(r);
    }

    private MorphiumIterator<Msg> findMessages(String name, boolean multiple) {
        HashMap<String, Object> values = new HashMap<String, Object>();
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        Query<Msg> or1 = q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(null).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(null);
        Query<Msg> or2 = q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(this.id);
        Set<String> pausedMessagesKeys = this.pauseMessages.keySet();
        if (name != null) {
            or1.f((Enum)Msg.Fields.name).eq(name);
            or2.f((Enum)Msg.Fields.name).eq(name);
        } else {
            if (!this.pauseMessages.isEmpty()) {
                or1.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
                or2.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
            }
            if (this.listeners.isEmpty() && !this.listenerByName.isEmpty()) {
                or1.f((Enum)Msg.Fields.name).in(this.listenerByName.keySet());
                or2.f((Enum)Msg.Fields.name).in(this.listenerByName.keySet());
            } else if (this.listenerByName.isEmpty() && this.listeners.isEmpty()) {
                return q.q().f((Enum)Msg.Fields.inAnswerTo).in(this.waitingForMessages.keySet()).asIterable();
            }
        }
        ArrayList<MorphiumId> processingIds = new ArrayList<MorphiumId>(this.processing);
        if (!this.processing.isEmpty()) {
            q.f("_id").nin(processingIds);
        }
        q.or(or1, or2);
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        if (!multiple) {
            q.limit(1);
        }
        values.put("locked_by", this.id);
        values.put("locked", System.currentTimeMillis());
        this.morphium.set(q.q().f("_id").in(q.idList()), values, false, multiple);
        q = q.q();
        if (name != null) {
            q.f((Enum)Msg.Fields.name).eq(name);
        } else if (!this.pauseMessages.isEmpty()) {
            q.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
        }
        q.f("_id").nin(processingIds);
        if (name != null) {
            q.f((Enum)Msg.Fields.name).eq(name);
        } else if (!this.pauseMessages.isEmpty()) {
            q.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
        }
        q.or(q.q().f((Enum)Msg.Fields.lockedBy).eq(this.id), q.q().f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(this.id), q.q().f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(null));
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        if (!multiple) {
            q.limit(1);
        }
        MorphiumIterator<Msg> it = q.asIterable(this.windowSize);
        it.setMultithreaddedAccess(this.multithreadded);
        return it;
    }

    private void findAndProcessMessages(boolean multiple) {
        MorphiumIterator<Msg> messages = this.findMessages(null, multiple);
        this.processMessages(messages);
    }

    private void lockAndProcess(Msg obj) {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        q.f("_id").eq(obj.getMsgId());
        q.f((Enum)Msg.Fields.processedBy).ne(this.id);
        q.f((Enum)Msg.Fields.lockedBy).eq(null);
        HashMap<String, Object> values = new HashMap<String, Object>();
        values.put("locked_by", this.id);
        values.put("locked", System.currentTimeMillis());
        this.morphium.set(q, values, false, this.processMultiple);
        try {
            Thread.sleep(10L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        obj = this.morphium.reread(obj, this.getCollectionName());
        if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(this.id)) {
            ArrayList<Msg> lst = new ArrayList<Msg>();
            lst.add(obj);
            if (log.isDebugEnabled()) {
                log.debug("locked messages: " + lst.size());
            }
            try {
                this.processMessages(lst);
            }
            catch (Exception e) {
                log.error("Error during message processing ", (Throwable)e);
            }
        }
    }

    private void processMessages(Iterable<Msg> messages) {
        for (Msg m : messages) {
            Msg msg;
            if (m == null || (msg = this.morphium.reread(m, this.getCollectionName())) == null) continue;
            if (msg.getInAnswerTo() != null && this.waitingForMessages.get(msg.getInAnswerTo()) != null) {
                if (log.isDebugEnabled()) {
                    log.debug(this.getSenderId() + ": Got a message, we are waiting for...");
                }
                this.waitingForAnswers.put((MorphiumId)msg.getInAnswerTo(), msg);
                this.processing.remove(m.getMsgId());
                this.morphium.delete(msg, this.getCollectionName());
                return;
            }
            if (this.listeners.isEmpty() && this.listenerByName.isEmpty()) {
                this.updateProcessedByAndReleaseLock(msg);
                log.error(this.getSenderId() + ": should not be here. not processing message, as no listeners are defined " + msg.getMsgId());
                return;
            }
            if (this.processing.contains(m.getMsgId())) continue;
            this.processing.add(m.getMsgId());
            Runnable r = () -> {
                Msg answer;
                if (m.getProcessedBy() != null && m.getProcessedBy().contains(this.id)) {
                    return;
                }
                if (msg == null) {
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (!msg.getLockedBy().equals(this.id) && !msg.getLockedBy().equals("ALL")) {
                    this.processing.remove(m.getMsgId());
                    return;
                }
                if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
                    if (log.isDebugEnabled()) {
                        log.debug(this.getSenderId() + ": Found outdated message - deleting it!");
                    }
                    this.morphium.delete(msg, this.getCollectionName());
                    this.processing.remove(m.getMsgId());
                    return;
                }
                boolean wasProcessed = false;
                boolean wasRejected = false;
                ArrayList<MessageRejectedException> rejections = new ArrayList<MessageRejectedException>();
                ArrayList<MessageListener> lst = new ArrayList<MessageListener>(this.listeners);
                if (this.listenerByName.get(msg.getName()) != null) {
                    lst.addAll((Collection<MessageListener>)this.listenerByName.get(msg.getName()));
                }
                if (lst.isEmpty()) {
                    if (log.isDebugEnabled()) {
                        log.debug(this.getSenderId() + ": Message did not have a listener registered");
                    }
                    wasProcessed = true;
                }
                for (MessageListener l : lst) {
                    try {
                        answer = l.onMessage(this, msg);
                        wasProcessed = true;
                        if (this.autoAnswer && answer == null) {
                            answer = new Msg(msg.getName(), "received", "");
                        }
                        if (answer == null) continue;
                        msg.sendAnswer(this, answer);
                        if (log.isDebugEnabled()) {
                            log.debug("sent answer to " + answer.getInAnswerTo() + " recipient: " + answer.getRecipient() + " id: " + answer.getMsgId());
                        }
                        if (answer.getRecipient() != null) continue;
                        log.warn("Recipient of answer is null?!?!");
                    }
                    catch (MessageRejectedException mre) {
                        log.warn("Message was rejected by listener", (Throwable)mre);
                        wasRejected = true;
                        rejections.add(mre);
                    }
                    catch (Exception e) {
                        log.error("listener Processing failed", (Throwable)e);
                    }
                }
                if (wasRejected) {
                    for (MessageRejectedException mre : rejections) {
                        if (mre.isSendAnswer()) {
                            answer = new Msg(msg.getName(), "message rejected by listener", mre.getMessage());
                            msg.sendAnswer(this, answer);
                        }
                        if (!mre.isContinueProcessing()) continue;
                        this.updateProcessedByAndReleaseLock(msg);
                        this.processing.remove(m.getMsgId());
                        log.info("Message will be re-processed by others");
                    }
                }
                if (!wasProcessed && !wasRejected) {
                    log.error("message was not processed");
                } else if (wasRejected) {
                    log.debug("Message rejected");
                }
                if (msg.getLockedBy() != null && msg.getLockedBy().equals("ALL") || msg.getRecipient() != null && msg.getRecipient().equals(this.id) && msg.getInAnswerTo() != null) {
                    this.updateProcessedByAndReleaseLock(msg);
                } else {
                    this.morphium.delete(msg, this.getCollectionName());
                }
                Runnable rb = () -> this.processing.remove(m.getMsgId());
                while (true) {
                    try {
                        if (this.decouplePool.isTerminated() || this.decouplePool.isTerminating() || this.decouplePool.isShutdown()) break;
                        this.decouplePool.schedule(rb, m.getTtl(), TimeUnit.MILLISECONDS);
                    }
                    catch (RejectedExecutionException ex) {
                        try {
                            Thread.sleep(this.pause);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    break;
                }
            };
            this.queueOrRun(r);
        }
        while (this.morphium.getWriteBufferCount() > 0) {
            Thread.yield();
        }
    }

    private void updateProcessedByAndReleaseLock(Msg msg) {
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class);
        idq.setCollectionName(this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        if (msg.getLockedBy().equals(this.id)) {
            this.morphium.set(idq, (Enum)Msg.Fields.lockedBy, null);
        }
        this.morphium.push(idq, Msg.Fields.processedBy, (Object)this.id);
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            boolean queued = false;
            while (!queued) {
                try {
                    this.threadPool.execute(r);
                    queued = true;
                }
                catch (Throwable throwable) {}
            }
            while (this.threadPool.getActiveCount() > this.windowSize) {
                Thread.yield();
            }
        } else {
            r.run();
        }
    }

    public String getCollectionName() {
        if (this.queueName == null || this.queueName.isEmpty()) {
            return "msg";
        }
        return "mmsg_" + this.queueName;
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        this.listenerByName.get(n).add(l);
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        this.listenerByName = c;
    }

    public String getSenderId() {
        return this.id;
    }

    public Messaging setSenderId(String id) {
        this.id = id;
        return this;
    }

    public int getPause() {
        return this.pause;
    }

    public Messaging setPause(int pause) {
        this.pause = pause;
        return this;
    }

    public boolean isRunning() {
        if (this.useChangeStream) {
            return this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning();
        }
        return this.running;
    }

    @Deprecated
    public void setRunning(boolean running) {
        if (!running && this.changeStreamMonitor != null) {
            this.changeStreamMonitor.terminate();
        }
        this.running = running;
    }

    public void terminate() {
        int sz;
        this.running = false;
        if (this.decouplePool != null) {
            sz = this.decouplePool.shutdownNow().size();
            if (log.isDebugEnabled()) {
                log.debug("Shutting down with " + sz + " runnables still scheduled");
            }
        }
        if (this.threadPool != null) {
            sz = this.threadPool.shutdownNow().size();
            if (log.isDebugEnabled()) {
                log.debug("Shutting down with " + sz + " runnables still pending in pool");
            }
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.terminate();
        }
        this.sendMessageToSelf(new Msg("info", "going down", "now"));
        if (this.isAlive()) {
            this.interrupt();
        }
        int retry = 0;
        while (this.isAlive()) {
            try {
                Messaging.sleep(250L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (++retry <= 10) continue;
            throw new RuntimeException("Could not terminate Messaging!");
        }
        if (this.isAlive()) {
            this.stop();
        }
    }

    public void addMessageListener(MessageListener l) {
        this.listeners.add(l);
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        this.storeMsg(m, true);
    }

    @Override
    public synchronized void start() {
        super.start();
        if (this.useChangeStream) {
            try {
                Thread.sleep(250L);
            }
            catch (Exception e) {
                log.error("error:" + e.getMessage());
            }
        }
    }

    public void storeMessage(Msg m) {
        this.storeMsg(m, false);
    }

    public long getNumberOfMessages() {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        return q.countAll();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender(this.id);
        m.addProcessedId(this.id);
        m.setSenderHost(this.hostname);
        if (m.getTo() != null && !m.getTo().isEmpty()) {
            for (String recipient : m.getTo()) {
                try {
                    Msg msg = (Msg)m.getClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    List<Field> flds = this.morphium.getARHelper().getAllFields(m.getClass());
                    for (Field f : flds) {
                        f.setAccessible(true);
                        f.set(msg, f.get(m));
                    }
                    msg.setMsgId(null);
                    msg.setRecipient(recipient);
                    this.morphium.storeNoCache(msg, this.getCollectionName(), cb);
                }
                catch (Exception e) {
                    throw new RuntimeException("Sending of answer failed", e);
                }
            }
        } else {
            this.morphium.storeNoCache(m, this.getCollectionName(), cb);
        }
    }

    public void sendMessageToSelf(Msg m) {
        this.sendMessageToSelf(m, false);
    }

    public void queueMessagetoSelf(Msg m) {
        this.sendMessageToSelf(m, true);
    }

    private void sendMessageToSelf(Msg m, boolean async) {
        Object cb = null;
        if (async) {
            // empty if block
        }
        m.setSender("self");
        m.setRecipient(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.storeNoCache(m, this.getCollectionName());
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public Messaging setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
        return this;
    }

    @Override
    public void onShutdown(Morphium m) {
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdown();
                Thread.sleep(200L);
                if (this.threadPool != null) {
                    this.threadPool.shutdownNow();
                }
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.terminate();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) {
        theMessage.setMsgId(new MorphiumId());
        this.waitingForMessages.put(theMessage.getMsgId(), theMessage);
        this.storeMessage(theMessage);
        long start = System.currentTimeMillis();
        while (!this.waitingForAnswers.containsKey(theMessage.getMsgId())) {
            if (System.currentTimeMillis() - start > timeoutInMs) {
                log.error("Did not receive answer " + theMessage.getName() + "/" + theMessage.getMsgId() + " in time (" + timeoutInMs + "ms)");
                this.waitingForMessages.remove(theMessage.getMsgId());
                throw new RuntimeException("Did not receive answer for message " + theMessage.getName() + "/" + theMessage.getMsgId() + " in time (" + timeoutInMs + "ms)");
            }
            Thread.yield();
        }
        if (log.isDebugEnabled()) {
            log.debug("got message after: " + (System.currentTimeMillis() - start) + "ms");
        }
        this.waitingForMessages.remove(theMessage.getMsgId());
        return (T)this.waitingForAnswers.remove(theMessage.getMsgId());
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout) {
        ArrayList<Msg> answers = new ArrayList<Msg>();
        this.storeMessage(theMessage);
        this.waitingForMessages.put(theMessage.getMsgId(), theMessage);
        long start = System.currentTimeMillis();
        while (true) {
            if (this.waitingForAnswers.get(theMessage.getMsgId()) != null) {
                answers.add(this.waitingForAnswers.remove(theMessage.getMsgId()));
            }
            if (numberOfAnswers >= 0 && answers.size() >= numberOfAnswers || System.currentTimeMillis() - start > timeout) break;
            Thread.yield();
        }
        this.waitingForMessages.remove(theMessage.getMsgId());
        return answers;
    }

    public boolean isProcessMultiple() {
        return this.processMultiple;
    }

    public Messaging setProcessMultiple(boolean processMultiple) {
        this.processMultiple = processMultiple;
        return this;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public Messaging setQueueName(String queueName) {
        this.queueName = queueName;
        return this;
    }

    public boolean isMultithreadded() {
        return this.multithreadded;
    }

    public Messaging setMultithreadded(boolean multithreadded) {
        this.multithreadded = multithreadded;
        return this;
    }

    public int getWindowSize() {
        return this.windowSize;
    }

    public Messaging setWindowSize(int windowSize) {
        this.windowSize = windowSize;
        return this;
    }

    public boolean isUseChangeStream() {
        return this.useChangeStream;
    }

    public Messaging setUseChangeStream(boolean useChangeStream) {
        this.useChangeStream = useChangeStream;
        return this;
    }
}

