/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.StatisticKeys;
import de.caluga.morphium.StatisticValue;
import de.caluga.morphium.Utils;
import de.caluga.morphium.UtilsMap;
import de.caluga.morphium.async.AsyncCallbackAdapter;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamEvent;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.MorphiumDriverException;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.driver.commands.FindCommand;
import de.caluga.morphium.driver.commands.InsertMongoCommand;
import de.caluga.morphium.driver.commands.MongoCommand;
import de.caluga.morphium.driver.commands.ReadMongoCommand;
import de.caluga.morphium.driver.commands.UpdateMongoCommand;
import de.caluga.morphium.driver.commands.WriteMongoCommand;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.MsgLock;
import de.caluga.morphium.messaging.StatusInfoListener;
import de.caluga.morphium.query.Query;
import java.lang.invoke.CallSite;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Messaging
extends Thread
implements ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(Messaging.class);
    private final StatusInfoListener statusInfoListener;
    private String statusInfoListenerName = "morphium.status_info";
    private boolean statusInfoListenerEnabled = true;
    private final Morphium morphium;
    private boolean running;
    private int pause;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private boolean processMultiple;
    private final List<MessageListener> listeners;
    private final Map<String, Long> pauseMessages = new ConcurrentHashMap<String, Long>();
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private String lockCollectionName = null;
    private String collectionName = null;
    private ThreadPoolExecutor threadPool;
    private final ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded;
    private int windowSize;
    private boolean useChangeStream;
    private ChangeStreamMonitor changeStreamMonitor;
    private final Map<MorphiumId, Queue<Msg>> waitingForAnswers = new ConcurrentHashMap<MorphiumId, Queue<Msg>>();
    private final BlockingQueue<ProcessingQueueElement> processing = new PriorityBlockingQueue<ProcessingQueueElement>();
    private final AtomicInteger requestPoll = new AtomicInteger(0);
    private final List<MorphiumId> idsInProgress = new Vector<MorphiumId>();

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
    }

    public Messaging(Morphium m) {
        this(m, null, 500, false, false, 100);
    }

    public Messaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, processMultiple, multithreadded, windowSize);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, processMultiple, false, m.getConfig().getMessagingWindowSize());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, m.isReplicaSet() || m.getDriver().getName().equals("InMemDriver"));
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this.setWindowSize(windowSize);
        this.setUseChangeStream(useChangeStream);
        this.setQueueName(queueName);
        this.setPause(pause);
        this.setProcessMultiple(processMultiple);
        this.morphium = m;
        this.statusInfoListener = new StatusInfoListener();
        this.statusInfoListenerEnabled = m.getConfig().isMessagingStatusInfoListenerEnabled();
        if (m.getConfig().getMessagingStatusInfoListenerName() != null) {
            this.statusInfoListenerName = m.getConfig().getMessagingStatusInfoListenerName();
        }
        this.setMultithreadded(multithreadded);
        this.decouplePool = new ScheduledThreadPoolExecutor(windowSize);
        this.decouplePool.setThreadFactory(new ThreadFactory(){
            private final AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "decouple_thr_" + String.valueOf(this.num));
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
        this.morphium.addShutdownListener(this);
        this.running = true;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        this.listeners = new CopyOnWriteArrayList<MessageListener>();
        this.listenerByName = new HashMap<String, List<MessageListener>>();
        this.requestPoll.set(1);
        try {
            m.ensureIndicesFor(Msg.class, this.getCollectionName());
            m.ensureIndicesFor(MsgLock.class, this.getLockCollectionName());
        }
        catch (Exception e) {
            log.error("Error during index checks", (Throwable)e);
        }
    }

    public void enableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(true);
    }

    public void disableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(false);
    }

    public String getStatusInfoListenerName() {
        return this.statusInfoListenerName;
    }

    public void setStatusInfoListenerName(String statusInfoListenerName) {
        this.listenerByName.remove(this.statusInfoListenerName);
        this.statusInfoListenerName = statusInfoListenerName;
        this.listenerByName.put(statusInfoListenerName, Arrays.asList(this.statusInfoListener));
    }

    public boolean isStatusInfoListenerEnabled() {
        return this.statusInfoListenerEnabled;
    }

    public void setStatusInfoListenerEnabled(boolean statusInfoListenerEnabled) {
        this.statusInfoListenerEnabled = statusInfoListenerEnabled;
        if (statusInfoListenerEnabled && !this.listenerByName.containsKey(this.statusInfoListenerName)) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        } else if (!statusInfoListenerEnabled) {
            this.listenerByName.remove(this.statusInfoListenerName);
        }
    }

    public Map<String, List<String>> getListenerNames() {
        HashMap<String, List<String>> ret = new HashMap<String, List<String>>();
        HashMap<String, List<MessageListener>> localCopy = new HashMap<String, List<MessageListener>>(this.listenerByName);
        for (Map.Entry e : localCopy.entrySet()) {
            ArrayList<String> classes = new ArrayList<String>();
            for (MessageListener lst : (List)e.getValue()) {
                classes.add(lst.getClass().getName());
            }
            ret.put((String)e.getKey(), classes);
        }
        return ret;
    }

    public List<String> getGlobalListeners() {
        ArrayList<MessageListener> localCopy = new ArrayList<MessageListener>(this.listeners);
        ArrayList<String> ret = new ArrayList<String>();
        for (MessageListener lst : localCopy) {
            ret.add(lst.getClass().getName());
        }
        return ret;
    }

    public Map<String, Long> getThreadPoolStats() {
        String prefix = "messaging.threadpool.";
        return UtilsMap.of(prefix + "largest_poolsize", Long.valueOf(this.threadPool.getLargestPoolSize())).add((CallSite)((Object)(prefix + "task_count")), this.threadPool.getTaskCount()).add((CallSite)((Object)(prefix + "core_size")), Long.valueOf(this.threadPool.getCorePoolSize())).add((CallSite)((Object)(prefix + "maximum_pool_size")), Long.valueOf(this.threadPool.getMaximumPoolSize())).add((CallSite)((Object)(prefix + "pool_size")), Long.valueOf(this.threadPool.getPoolSize())).add((CallSite)((Object)(prefix + "active_count")), Long.valueOf(this.threadPool.getActiveCount())).add((CallSite)((Object)(prefix + "completed_task_count")), this.threadPool.getCompletedTaskCount());
    }

    private void initThreadPool() {
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(){

            @Override
            public boolean offer(Runnable e) {
                int maximumPoolSize;
                int poolSize = Messaging.this.threadPool.getPoolSize();
                if (poolSize >= (maximumPoolSize = Messaging.this.threadPool.getMaximumPoolSize()) || poolSize > Messaging.this.threadPool.getActiveCount()) {
                    return super.offer(e);
                }
                return false;
            }
        };
        this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue);
        this.threadPool.setRejectedExecutionHandler((r, executor) -> {
            try {
                executor.getQueue().put(r);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        this.threadPool.setThreadFactory(new ThreadFactory(){
            private final AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "messaging " + String.valueOf(this.num));
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
    }

    public long getPendingMessagesCount() {
        Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        q1.f((Enum)Msg.Fields.sender).ne(this.id).f("processed_by.0").notExists();
        return q1.countAll();
    }

    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean handleChangeStreamEvent(ChangeStreamEvent evt) {
        block16: {
            if (!this.running) {
                return false;
            }
            try {
                if (evt == null || evt.getOperationType() == null) {
                    return this.running;
                }
                Object id = ((Map)evt.getDocumentKey()).get("_id");
                if (id instanceof MorphiumId) {
                    FindCommand fnd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(null));
                    Map<String, Object> msg = null;
                    try {
                        fnd.setFilter(Doc.of("_id", id));
                        ((FindCommand)fnd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                        fnd.setProjection(Doc.of("_id", (Object)1, "priority", (Object)1, "sender", (Object)1, "timestamp", (Object)1));
                        List<Map<String, Object>> msgs = fnd.execute();
                        if (!msgs.isEmpty()) {
                            msg = msgs.get(0);
                        }
                    }
                    finally {
                        fnd.releaseConnection();
                    }
                    if (msg == null) {
                        return this.running;
                    }
                    if (msg.get("sender").equals("id")) {
                        return this.running;
                    }
                    ProcessingQueueElement el = new ProcessingQueueElement();
                    el.setPriority((Integer)msg.get("priority"));
                    el.setId((MorphiumId)msg.get("_id"));
                    el.setTimestamp((Long)msg.get("timestamp"));
                    BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
                    synchronized (blockingQueue) {
                        if (!this.processing.contains(el)) {
                            this.processing.add(el);
                        }
                        break block16;
                    }
                }
                log.error("Some other id?!?!?" + id.getClass().getName());
            }
            catch (Exception e) {
                log.error("Error during event processing in changestream", (Throwable)e);
            }
        }
        return this.running;
    }

    private void initChangeStreams() {
        ArrayList<Map<String, Object>> pipeline = new ArrayList<Map<String, Object>>();
        LinkedHashMap match = new LinkedHashMap();
        LinkedHashMap<String, List<String>> in = new LinkedHashMap<String, List<String>>();
        in.put("$in", Arrays.asList("insert", "update"));
        match.put("operationType", in);
        pipeline.add(UtilsMap.of("$match", match));
        ChangeStreamMonitor lockMonitor = new ChangeStreamMonitor(this.morphium, this.getLockCollectionName(), false, this.pause, List.of(Doc.of("$match", Doc.of("operationType", Doc.of("$eq", "delete")))));
        lockMonitor.addListener(evt -> {
            this.requestPoll.incrementAndGet();
            return this.running;
        });
        this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, this.getCollectionName(), false, this.pause, pipeline);
        this.changeStreamMonitor.addListener(evt -> this.handleChangeStreamEvent(evt));
        this.changeStreamMonitor.start();
        lockMonitor.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.setName("Msg " + this.id);
        if (this.statusInfoListenerEnabled) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        }
        if (this.useChangeStream) {
            this.initChangeStreams();
        }
        try {
            this.decouplePool.scheduleWithFixedDelay(() -> {
                try {
                    if (this.requestPoll.get() > 0 || !this.useChangeStream) {
                        this.morphium.inc(StatisticKeys.PULL);
                        StatisticValue sk = this.morphium.getStats().get((Object)StatisticKeys.PULLSKIP);
                        sk.set(sk.get() + (long)this.requestPoll.get());
                        this.requestPoll.set(0);
                        this.findMessages(this.processMultiple);
                    } else {
                        this.morphium.inc(StatisticKeys.SKIPPED_MSG_UPDATES);
                    }
                }
                catch (Throwable e) {
                    if (this.running) {
                        log.error("Unhandled exception " + e.getMessage(), e);
                    }
                    throw e;
                }
            }, this.pause, this.pause, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException ex) {
            log.error("Executor died?!?!");
        }
        while (this.running) {
            try {
                ProcessingQueueElement prEl = this.processing.poll(1000L, TimeUnit.MILLISECONDS);
                if (prEl == null) continue;
                BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
                synchronized (blockingQueue) {
                    if (this.idsInProgress.contains(prEl.getId())) {
                        continue;
                    }
                    this.idsInProgress.add(prEl.getId());
                }
                Runnable r = () -> {
                    try {
                        Queue<Msg> answersForMessage;
                        Msg msg = this.morphium.findById(Msg.class, prEl.getId(), this.getCollectionName());
                        if (msg == null) {
                            return;
                        }
                        if (!msg.isAnswer() && !this.getListenerNames().containsKey(msg.getName()) && this.getGlobalListeners().isEmpty()) {
                            return;
                        }
                        if (msg.getSender().equals(this.id)) {
                            return;
                        }
                        if (msg.isExclusive() && msg.getProcessedBy() != null && msg.getProcessedBy().size() != 0) {
                            return;
                        }
                        if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                            return;
                        }
                        if (msg.getRecipients() != null && !msg.getRecipients().contains(this.id)) {
                            return;
                        }
                        if (msg.isAnswer() && null != (answersForMessage = this.waitingForAnswers.get(msg.getInAnswerTo()))) {
                            this.updateProcessedBy(msg);
                            if (!answersForMessage.contains(msg)) {
                                answersForMessage.add(msg);
                            }
                            this.checkDeleteAfterProcessing(msg);
                            return;
                        }
                        if (!this.getListenerNames().containsKey(msg.getName()) && this.getGlobalListeners().isEmpty()) {
                            return;
                        }
                        if (msg.isExclusive()) {
                            this.lockAndProcess(msg);
                        } else {
                            this.processMessage(msg);
                        }
                    }
                    finally {
                        BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
                        synchronized (blockingQueue) {
                            this.idsInProgress.remove(prEl.getId());
                        }
                    }
                };
                this.queueOrRun(r);
            }
            catch (Exception exception) {}
        }
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " stopped!");
        }
        this.listeners.clear();
        this.listenerByName.clear();
    }

    private void checkDeleteAfterProcessing(Msg obj) {
        if (obj.isDeleteAfterProcessing()) {
            if (obj.getDeleteAfterProcessingTime() == 0) {
                this.morphium.delete(obj, this.getCollectionName());
            } else {
                obj.setDeleteAt(new Date(System.currentTimeMillis() + (long)obj.getDeleteAfterProcessingTime()));
                this.morphium.set(obj, this.getCollectionName(), Msg.Fields.deleteAt, obj.getDeleteAt());
                if (obj.isExclusive()) {
                    this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(obj.getMsgId()).set(MsgLock.Fields.deleteAt, (Object)obj.getDeleteAt());
                }
            }
        }
    }

    public void pauseProcessingOfMessagesNamed(String name) {
        this.pauseMessages.putIfAbsent(name, System.currentTimeMillis());
    }

    public List<String> getPausedMessageNames() {
        return new ArrayList<String>(this.pauseMessages.keySet());
    }

    public Long unpauseProcessingOfMessagesNamed(String name) {
        if (!this.pauseMessages.containsKey(name)) {
            return 0L;
        }
        Long ret = this.pauseMessages.remove(name);
        if (ret != null) {
            ret = System.currentTimeMillis() - ret;
        }
        this.requestPoll.incrementAndGet();
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ProcessingQueueElement> getMessagesForProcessing(boolean multiple) {
        if (!this.running) {
            return new ArrayList<ProcessingQueueElement>();
        }
        Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        if (this.listenerByName.isEmpty() && this.listeners.isEmpty()) {
            return q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.inAnswerTo).in(this.waitingForAnswers.keySet()).limit(this.windowSize).idList();
        }
        List idsToIgnore = this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getCollectionName() + "_lck").idList();
        BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
        synchronized (blockingQueue) {
            for (ProcessingQueueElement p : this.processing) {
                idsToIgnore.add(p.getId());
            }
            idsToIgnore.addAll(this.idsInProgress);
        }
        Query<Msg> q1 = q.q().f((Enum)Msg.Fields.exclusive).eq(true).f("processed_by.0").notExists();
        Query<Msg> q2 = q.q().f((Enum)Msg.Fields.exclusive).ne(true).f((Enum)Msg.Fields.processedBy).ne(this.id);
        q.f("_id").nin(idsToIgnore).f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id));
        Set<String> pausedMessagesKeys = this.pauseMessages.keySet();
        if (!this.pauseMessages.isEmpty()) {
            q.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
        }
        q.or(q1, q2);
        q.setLimit(this.windowSize);
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        ArrayList<ProcessingQueueElement> queueElements = new ArrayList<ProcessingQueueElement>();
        MongoCommand fnd = null;
        try {
            int ws = this.windowSize;
            if (!multiple) {
                ws = 1;
            }
            fnd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(Msg.class)));
            fnd.setDb(this.morphium.getDatabase());
            ((FindCommand)fnd).setFilter(q.toQueryObject());
            ((FindCommand)fnd).setProjection(Doc.of("_id", (Object)1, "priority", (Object)1, "timestamp", (Object)1));
            ((FindCommand)fnd).setLimit(ws);
            ((FindCommand)fnd).setBatchSize(q.getBatchSize());
            ((FindCommand)fnd).setSort(q.getSort());
            ((FindCommand)fnd).setSkip(q.getSkip());
            fnd.setColl(this.getCollectionName());
            List<Map<String, Object>> result = ((ReadMongoCommand)fnd).execute();
            fnd.releaseConnection();
            fnd = null;
            if (!result.isEmpty()) {
                for (Map<String, Object> el : result) {
                    queueElements.add(new ProcessingQueueElement((Integer)el.get("priority"), (Long)el.get("timestamp"), (MorphiumId)el.get("_id")));
                }
            }
            if (q.countAll() != (long)queueElements.size()) {
                this.requestPoll.incrementAndGet();
            }
            ArrayList<ProcessingQueueElement> arrayList = queueElements;
            return arrayList;
        }
        catch (Exception e) {
            log.error(this.id + ": Error while processing", (Throwable)e);
            List<ProcessingQueueElement> list = null;
            return list;
        }
        finally {
            if (fnd != null) {
                fnd.releaseConnection();
            }
        }
    }

    public void triggerCheck() {
        log.debug("Triggercheck called");
        this.requestPoll.incrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void findMessages(boolean multiple) {
        if (!this.running) {
            return;
        }
        List<ProcessingQueueElement> messages = this.getMessagesForProcessing(multiple);
        if (messages == null) {
            return;
        }
        if (messages.size() == 0) {
            return;
        }
        for (ProcessingQueueElement el : messages) {
            BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
            synchronized (blockingQueue) {
                if (!this.processing.contains(el) && !this.idsInProgress.contains(el)) {
                    this.processing.add(el);
                }
                int n = -10;
            }
        }
    }

    private void lockAndProcess(Msg obj) {
        if (this.lockMessage(obj, this.id, obj.getDeleteAt())) {
            this.processMessage(obj);
        } else {
            this.requestPoll.incrementAndGet();
        }
    }

    public MsgLock getLock(Msg m) {
        return this.morphium.findById(MsgLock.class, m.getMsgId(), this.getLockCollectionName());
    }

    public String getLockCollectionName() {
        if (this.lockCollectionName == null) {
            this.lockCollectionName = this.getCollectionName() + "_lck";
        }
        return this.lockCollectionName;
    }

    public boolean lockMessage(Msg m, String lockId) {
        return this.lockMessage(m, lockId, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean lockMessage(Msg m, String lockId, Date delAt) {
        long start = System.currentTimeMillis();
        MsgLock lck = new MsgLock(m);
        lck.setLockId(lockId);
        if (delAt != null) {
            lck.setDeleteAt(delAt);
        }
        MongoCommand cmd = null;
        try {
            cmd = new InsertMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(MsgLock.class)));
            ((InsertMongoCommand)((InsertMongoCommand)cmd.setColl(this.getLockCollectionName())).setDb(this.morphium.getDatabase())).setDocuments(List.of(this.morphium.getMapper().serialize(lck)));
            ((InsertMongoCommand)cmd).execute();
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            boolean bl = false;
            return bl;
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    private void processMessage(Msg ms) {
        if (ms == null) {
            return;
        }
        Msg msg = this.morphium.reread(ms, this.getCollectionName());
        if (msg == null) {
            this.unlockIfExclusive(ms);
            return;
        }
        if (msg.getSender().equals(this.getSenderId())) {
            log.error("This should have been filtered out before alreaday!!!");
            return;
        }
        if (msg.isTimingOut() && msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
            log.debug(this.getSenderId() + ": Found outdated message - deleting it!");
            this.morphium.delete(msg, this.getCollectionName());
            this.unlockIfExclusive(msg);
            return;
        }
        if (msg.isExclusive() && msg.getProcessedBy().size() > 0) {
            return;
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return;
        }
        if (this.listeners.isEmpty() && this.listenerByName.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Not further processing - no listener for non answer message");
            }
            this.unlockIfExclusive(msg);
            return;
        }
        boolean wasProcessed = false;
        boolean wasRejected = false;
        ArrayList<MessageRejectedException> rejections = new ArrayList<MessageRejectedException>();
        ArrayList<MessageListener> lst = new ArrayList<MessageListener>(this.listeners);
        if (this.listenerByName.get(msg.getName()) != null) {
            lst.addAll((Collection<MessageListener>)this.listenerByName.get(msg.getName()));
        }
        for (MessageListener l : lst) {
            try {
                if (this.pauseMessages.containsKey(msg.getName())) {
                    wasProcessed = false;
                    this.unlockIfExclusive(msg);
                    break;
                }
                if (l.markAsProcessedBeforeExec()) {
                    this.updateProcessedBy(msg);
                }
                Msg answer = l.onMessage(this, msg);
                wasProcessed = true;
                if (this.autoAnswer && answer == null) {
                    answer = new Msg(msg.getName(), "received", "");
                }
                if (answer == null) continue;
                msg.sendAnswer(this, answer);
                if (answer.getRecipients() != null) continue;
                log.warn("Recipient of answer is null?!?!");
            }
            catch (MessageRejectedException mre) {
                log.info(this.id + ": Message was rejected by listener: " + mre.getMessage());
                wasRejected = true;
                rejections.add(mre);
                this.requestPoll.incrementAndGet();
            }
            catch (Exception e) {
                log.error(this.id + ": listener Processing failed", (Throwable)e);
                this.checkDeleteAfterProcessing(msg);
            }
        }
        if (wasRejected) {
            for (MessageRejectedException mre : rejections) {
                if (mre.getRejectionHandler() != null) {
                    try {
                        mre.getRejectionHandler().handleRejection(this, msg);
                    }
                    catch (Exception e) {
                        log.error("Error in rejection handling", (Throwable)e);
                    }
                    continue;
                }
                log.error("No rejection  handler defined!!!");
            }
        }
        if (wasProcessed && !msg.getProcessedBy().contains(this.id)) {
            this.updateProcessedBy(msg);
        }
        if (!wasRejected && wasProcessed) {
            this.checkDeleteAfterProcessing(msg);
        }
        this.unlockIfExclusive(msg);
    }

    private void unlockIfExclusive(Msg msg) {
        if (msg.isExclusive()) {
            this.deleteLock(msg.getMsgId());
        }
    }

    private void deleteLock(MorphiumId msgId) {
        this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getLockCollectionName()).f("_id").eq(msgId).f("lock_id").eq(this.id).remove();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateProcessedBy(Msg msg) {
        if (msg == null) {
            return;
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return;
        }
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        msg.getProcessedBy().add(this.id);
        Map<String, Object> qobj = idq.toQueryObject();
        Doc set = Doc.of("processed_by", this.id);
        Doc update = Doc.of("$addToSet", set);
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, false, null, null, null);
            Map<String, Object> ret = ((WriteMongoCommand)cmd).execute();
            cmd.releaseConnection();
            cmd = null;
            if ((ret.get("nModified") == null && ret.get("modified") == null || Integer.valueOf(0).equals(ret.get("nModified"))) && this.morphium.reread(msg, this.getCollectionName()) != null && !msg.getProcessedBy().contains(this.id)) {
                log.warn(this.id + ": Could not update processed_by in msg " + String.valueOf(msg.getMsgId()));
                log.warn(this.id + ": " + Utils.toJsonString(ret));
                log.warn(this.id + ": msg: " + msg.toString());
            }
        }
        catch (MorphiumDriverException e) {
            log.error("Error updating processed by - this might lead to duplicate execution!", (Throwable)e);
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            boolean queued = false;
            while (!queued) {
                try {
                    while (this.threadPool.getActiveCount() > this.windowSize) {
                        Thread.sleep(this.morphium.getConfig().getIdleSleepTime());
                    }
                    this.threadPool.execute(r);
                    queued = true;
                }
                catch (Throwable throwable) {}
            }
        } else {
            r.run();
        }
    }

    public String getCollectionName() {
        if (this.collectionName == null) {
            this.collectionName = this.queueName == null || this.queueName.isEmpty() ? "msg" : "mmsg_" + this.queueName;
        }
        return this.collectionName;
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        if (this.listenerByName.get(n).contains(l)) {
            log.error("cowardly refusing to add already registered listener for name " + n);
        } else {
            this.listenerByName.get(n).add(l);
        }
        this.requestPoll.incrementAndGet();
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        if (((List)c.get(n)).isEmpty()) {
            c.remove(n);
        }
        this.listenerByName = c;
    }

    public String getSenderId() {
        return this.id;
    }

    public Messaging setSenderId(String id) {
        this.id = id;
        return this;
    }

    public int getPause() {
        return this.pause;
    }

    public Messaging setPause(int pause) {
        this.pause = pause;
        return this;
    }

    public boolean isRunning() {
        if (this.useChangeStream) {
            return this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning();
        }
        return this.running;
    }

    public void terminate() {
        int sz;
        this.running = false;
        this.listenerByName.clear();
        this.listeners.clear();
        this.waitingForAnswers.clear();
        this.processing.clear();
        this.requestPoll.set(0);
        if (this.decouplePool != null) {
            try {
                sz = this.decouplePool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still scheduled");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down decouple-pool", (Throwable)e);
            }
        }
        this.morphium.removeShutdownListener(this);
        if (this.threadPool != null) {
            try {
                sz = this.threadPool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still pending in pool");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down threadpool");
            }
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.terminate();
        }
        if (this.isAlive()) {
            try {
                this.interrupt();
            }
            catch (Exception e) {
                log.warn("Exception when interrupint messaging thread", (Throwable)e);
            }
        }
        int retry = 0;
        while (this.isAlive()) {
            try {
                Messaging.sleep(150L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (++retry <= 2 * this.morphium.getConfig().getMaxWaitTime() / 150) continue;
            throw new RuntimeException("Could not terminate Messaging!");
        }
    }

    public void addMessageListener(MessageListener l) {
        if (this.listeners.contains(l)) {
            log.error("Cowardly refusing to add already registered listener");
        } else {
            this.listeners.add(l);
        }
        this.requestPoll.incrementAndGet();
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        if (this.morphium.getDriver().getName().equals("SingleMongoConnectDriver")) {
            this.storeMsg(m, false);
        } else {
            this.storeMsg(m, true);
        }
    }

    public void sendMessage(Msg m) {
        this.storeMsg(m, false);
    }

    public long getNumberOfMessages() {
        return this.getPendingMessagesCount();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    public void sendMessageToSelf(Msg m) {
        this.sendMessageToSelf(m, false);
    }

    public void queueMessagetoSelf(Msg m) {
        this.sendMessageToSelf(m, true);
    }

    private void sendMessageToSelf(Msg m, boolean async) {
        AsyncCallbackAdapter cb = null;
        if (async) {
            cb = new AsyncCallbackAdapter();
        }
        m.setSender("self");
        m.addRecipient(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public Messaging setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
        return this;
    }

    @Override
    public void onShutdown(Morphium m) {
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdown();
                Thread.sleep(200L);
                if (this.threadPool != null) {
                    this.threadPool.shutdownNow();
                }
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.terminate();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) {
        return this.sendAndAwaitFirstAnswer(theMessage, timeoutInMs, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs, boolean throwExceptionOnTimeout) {
        if (!this.running) {
            throw new SystemShutdownException("Messaging shutting down - abort sending!");
        }
        if (theMessage.getMsgId() == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        MorphiumId requestMsgId = theMessage.getMsgId();
        LinkedBlockingDeque blockingQueue = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, blockingQueue);
        try {
            this.sendMessage(theMessage);
            Msg firstAnswer = (Msg)blockingQueue.poll(timeoutInMs, TimeUnit.MILLISECONDS);
            if (null == firstAnswer && throwExceptionOnTimeout) {
                throw new MessageTimeoutException("Did not receive answer for message " + theMessage.getName() + "/" + String.valueOf(requestMsgId) + " in time (" + timeoutInMs + "ms)");
            }
            Msg msg = firstAnswer;
            return (T)msg;
        }
        catch (InterruptedException e) {
            log.error("Did not receive answer for message " + theMessage.getName() + "/" + String.valueOf(requestMsgId) + " interrupted.", (Throwable)e);
        }
        finally {
            this.waitingForAnswers.remove(requestMsgId);
        }
        return null;
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout) {
        return this.sendAndAwaitAnswers(theMessage, numberOfAnswers, timeout, false);
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout, boolean throwExceptionOnTimeout) {
        MorphiumId requestMsgId = theMessage.getMsgId();
        if (requestMsgId == null) {
            theMessage.setMsgId(new MorphiumId());
            requestMsgId = theMessage.getMsgId();
        }
        LinkedBlockingDeque answerList = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, answerList);
        this.sendMessage(theMessage);
        long start = System.currentTimeMillis();
        while (this.running && (answerList.size() <= 0 || numberOfAnswers <= 0 || answerList.size() < numberOfAnswers)) {
            if (throwExceptionOnTimeout && System.currentTimeMillis() - start > timeout && answerList.isEmpty()) {
                throw new MessageTimeoutException("Did not receive any answer for message " + theMessage.getName() + "/" + String.valueOf(requestMsgId) + "in time (" + timeout + ")");
            }
            if (System.currentTimeMillis() - start > timeout) break;
            if (!this.running) {
                throw new SystemShutdownException("Messaging shutting down - abort waiting!");
            }
            try {
                Thread.sleep(this.morphium.getConfig().getIdleSleepTime());
            }
            catch (InterruptedException interruptedException) {}
        }
        return new ArrayList(this.waitingForAnswers.remove(requestMsgId));
    }

    public boolean isProcessMultiple() {
        return this.processMultiple;
    }

    public Messaging setProcessMultiple(boolean processMultiple) {
        this.processMultiple = processMultiple;
        return this;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public Messaging setQueueName(String queueName) {
        this.queueName = queueName;
        this.collectionName = null;
        this.lockCollectionName = null;
        return this;
    }

    public boolean isMultithreadded() {
        return this.multithreadded;
    }

    public Messaging setMultithreadded(boolean multithreadded) {
        if (!multithreadded && this.threadPool != null) {
            this.threadPool.shutdownNow();
            this.threadPool = null;
        } else if (multithreadded && this.threadPool == null) {
            this.initThreadPool();
        }
        this.multithreadded = multithreadded;
        return this;
    }

    public int getWindowSize() {
        return this.windowSize;
    }

    public Messaging setWindowSize(int windowSize) {
        this.windowSize = windowSize;
        return this;
    }

    public boolean isUseChangeStream() {
        return this.useChangeStream;
    }

    public int getRunningTasks() {
        if (this.threadPool != null) {
            return this.threadPool.getActiveCount();
        }
        return 0;
    }

    public Morphium getMorphium() {
        return this.morphium;
    }

    public Messaging setPolling(boolean doPolling) {
        this.useChangeStream = !doPolling;
        return this;
    }

    public Messaging setUseChangeStream(boolean useChangeStream) {
        this.useChangeStream = useChangeStream;
        return this;
    }

    public static class ProcessingQueueElement
    implements Comparable<ProcessingQueueElement> {
        private int priority;
        private MorphiumId id;
        private long timestamp;

        public ProcessingQueueElement() {
        }

        public ProcessingQueueElement(int priority, long timestamp, MorphiumId id) {
            this.priority = priority;
            this.id = id;
            this.timestamp = timestamp;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public ProcessingQueueElement setTimestamp(long timestamp) {
            this.timestamp = timestamp;
            return this;
        }

        public int getPriority() {
            return this.priority;
        }

        public ProcessingQueueElement setPriority(int priority) {
            this.priority = priority;
            return this;
        }

        public MorphiumId getId() {
            return this.id;
        }

        public ProcessingQueueElement setId(MorphiumId id) {
            this.id = id;
            return this;
        }

        @Override
        public int compareTo(ProcessingQueueElement o) {
            if (o.getPriority() < this.priority) {
                return 1;
            }
            if (o.getPriority() > this.priority) {
                return -1;
            }
            if (o.getTimestamp() < this.timestamp) {
                return 1;
            }
            if (o.getTimestamp() > this.timestamp) {
                return -1;
            }
            return o.getId().compareTo(this.id);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProcessingQueueElement that = (ProcessingQueueElement)o;
            return Objects.equals(this.id, that.id);
        }

        public int hashCode() {
            return Objects.hash(this.priority, this.id, this.timestamp);
        }
    }

    public static class SystemShutdownException
    extends RuntimeException {
        public SystemShutdownException(String msg) {
            super(msg);
        }
    }

    public static class MessageTimeoutException
    extends RuntimeException {
        public MessageTimeoutException(String msg) {
            super(msg);
        }
    }
}

