/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.MorphiumConfig;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.StatisticKeys;
import de.caluga.morphium.StatisticValue;
import de.caluga.morphium.Utils;
import de.caluga.morphium.UtilsMap;
import de.caluga.morphium.annotations.Messaging;
import de.caluga.morphium.annotations.ReadPreferenceLevel;
import de.caluga.morphium.async.AsyncCallbackAdapter;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamEvent;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.config.MessagingSettings;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.MorphiumDriverException;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.driver.commands.FindCommand;
import de.caluga.morphium.driver.commands.InsertMongoCommand;
import de.caluga.morphium.driver.commands.MongoCommand;
import de.caluga.morphium.driver.commands.ReadMongoCommand;
import de.caluga.morphium.driver.commands.UpdateMongoCommand;
import de.caluga.morphium.driver.commands.WriteMongoCommand;
import de.caluga.morphium.driver.inmem.InMemoryDriver;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.MorphiumMessaging;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.MsgLock;
import de.caluga.morphium.messaging.StatusInfoListener;
import de.caluga.morphium.query.Query;
import java.lang.invoke.CallSite;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Messaging(name="StandardMessaging", description="Standard message queueing implementation")
public class SingleCollectionMessaging
extends Thread
implements ShutdownListener,
MorphiumMessaging {
    public static final String NAME = "StandardMessaging";
    private static final Logger log = LoggerFactory.getLogger(SingleCollectionMessaging.class);
    private final StatusInfoListener statusInfoListener = new StatusInfoListener();
    private String statusInfoListenerName = "morphium.status_info";
    private boolean statusInfoListenerEnabled = true;
    private Morphium morphium;
    private boolean running = true;
    private int pause = 100;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private final Map<String, Long> pauseMessages = new ConcurrentHashMap<String, Long>();
    private Map<String, List<MessageListener>> listenerByName = new HashMap<String, List<MessageListener>>();
    private String queueName;
    private String lockCollectionName = null;
    private String collectionName = null;
    private ThreadPoolExecutor threadPool;
    private ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded = true;
    private int windowSize = 100;
    private boolean useChangeStream = true;
    private ChangeStreamMonitor changeStreamMonitor;
    private static Vector<SingleCollectionMessaging> allMessagings = new Vector();
    private final Map<MorphiumId, Queue<Msg>> waitingForAnswers = new ConcurrentHashMap<MorphiumId, Queue<Msg>>();
    private final Map<MorphiumId, CallbackRequest> waitingForCallbacks = new ConcurrentHashMap<MorphiumId, CallbackRequest>();
    private final BlockingQueue<ProcessingQueueElement> processing = new PriorityBlockingQueue<ProcessingQueueElement>();
    private final AtomicInteger requestPoll = new AtomicInteger(0);
    private final List<MorphiumId> idsInProgress = new Vector<MorphiumId>();
    private final AtomicInteger changeStreamEventsReceived = new AtomicInteger(0);
    private MessagingSettings settings = null;
    private Vector<Object> docIdsFromChangestream = new Vector();

    public SingleCollectionMessaging() {
        allMessagings.add(this);
        this.id = UUID.randomUUID().toString();
        this.running = true;
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        this.requestPoll.set(1);
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
        if (!processMultiple) {
            this.setWindowSize(1);
        }
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, int pause) {
        this(m, null, pause, true, 10);
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m) {
        this(m, null, 500, false, 100);
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, multithreadded, windowSize);
        if (!processMultiple) {
            this.setWindowSize(1);
        }
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, int pause, boolean multithreadded, int windowSize) {
        this(m, null, pause, multithreadded, windowSize);
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, false, m.getConfig().getMessagingWindowSize());
        if (!processMultiple) {
            this.setWindowSize(1);
        }
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, String queueName, int pause, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, multithreadded, windowSize, m.isReplicaSet() || m.getDriver().getName().equals("InMemDriver"));
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, multithreadded, windowSize, m.isReplicaSet() || m.getDriver().getName().equals("InMemDriver"));
        if (!processMultiple) {
            this.setWindowSize(1);
        }
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this(m, queueName, pause, multithreadded, windowSize, useChangeStream);
        if (!processMultiple) {
            this.setWindowSize(1);
        }
    }

    @Deprecated
    public SingleCollectionMessaging(Morphium m, String queueName, int pause, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this();
        MorphiumConfig cfg = m.getConfig().createCopy();
        cfg.messagingSettings().setMessageQueueName(queueName);
        cfg.messagingSettings().setMessagingPollPause(pause);
        cfg.messagingSettings().setMessagingMultithreadded(multithreadded);
        cfg.messagingSettings().setMessagingWindowSize(windowSize);
        cfg.messagingSettings().setUseChangeStream(useChangeStream);
        this.init(m, cfg.messagingSettings());
    }

    @Override
    public void init(Morphium m) {
        this.init(m, m.getConfig().createCopy().messagingSettings());
    }

    @Override
    public void init(Morphium m, MessagingSettings settings) {
        this.morphium = m;
        this.settings = settings;
        this.statusInfoListenerEnabled = settings.isMessagingStatusInfoListenerEnabled();
        this.decouplePool = new ScheduledThreadPoolExecutor(this.windowSize, Thread.ofVirtual().name("decouple_thr-", 0L).factory());
        if (settings.getMessagingStatusInfoListenerName() != null) {
            this.statusInfoListenerName = settings.getMessagingStatusInfoListenerName();
        }
        this.setWindowSize(settings.getMessagingWindowSize());
        this.setUseChangeStream(settings.isUseChangeStream());
        this.setQueueName(settings.getMessageQueueName());
        this.setPause(settings.getMessagingPollPause());
        this.setMultithreadded(settings.isMessagingMultithreadded());
        this.morphium.ensureIndicesFor(Msg.class, this.getCollectionName());
        this.morphium.ensureIndicesFor(MsgLock.class, this.getLockCollectionName());
    }

    @Override
    public List<MorphiumMessaging> getAlternativeMessagings() {
        ArrayList<MorphiumMessaging> ret = new ArrayList<MorphiumMessaging>(allMessagings);
        ret.remove(this);
        return ret;
    }

    @Override
    public void enableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(true);
    }

    @Override
    public void disableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(false);
    }

    @Override
    public String getStatusInfoListenerName() {
        return this.statusInfoListenerName;
    }

    @Override
    public void setStatusInfoListenerName(String statusInfoListenerName) {
        this.listenerByName.remove(this.statusInfoListenerName);
        this.statusInfoListenerName = statusInfoListenerName;
        this.listenerByName.put(statusInfoListenerName, Arrays.asList(this.statusInfoListener));
    }

    @Override
    public int getProcessingCount() {
        return this.processing.size();
    }

    @Override
    public int getInProgressCount() {
        return this.idsInProgress.size();
    }

    @Override
    public int waitingForAnswersCount() {
        return this.waitingForAnswers.size();
    }

    @Override
    public int waitingForAnswersTotalCount() {
        int cnt = 0;
        for (Queue<Msg> l : this.waitingForAnswers.values()) {
            cnt += l.size();
        }
        return cnt;
    }

    @Override
    public boolean isStatusInfoListenerEnabled() {
        return this.statusInfoListenerEnabled;
    }

    @Override
    public void setStatusInfoListenerEnabled(boolean statusInfoListenerEnabled) {
        this.statusInfoListenerEnabled = statusInfoListenerEnabled;
        if (statusInfoListenerEnabled && !this.listenerByName.containsKey(this.statusInfoListenerName)) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        } else if (!statusInfoListenerEnabled) {
            this.listenerByName.remove(this.statusInfoListenerName);
        }
    }

    @Override
    public Map<String, List<String>> getListenerNames() {
        HashMap<String, List<String>> ret = new HashMap<String, List<String>>();
        HashMap<String, List<MessageListener>> localCopy = new HashMap<String, List<MessageListener>>(this.listenerByName);
        for (Map.Entry e : localCopy.entrySet()) {
            ArrayList<String> classes = new ArrayList<String>();
            for (MessageListener lst : (List)e.getValue()) {
                classes.add(lst.getClass().getName());
            }
            ret.put((String)e.getKey(), classes);
        }
        return ret;
    }

    @Override
    public Map<String, Long> getThreadPoolStats() {
        if (this.threadPool == null) {
            return Map.of();
        }
        String prefix = "messaging.threadpool.";
        return UtilsMap.of(prefix + "largest_poolsize", Long.valueOf(this.threadPool.getLargestPoolSize())).add((CallSite)((Object)(prefix + "task_count")), this.threadPool.getTaskCount()).add((CallSite)((Object)(prefix + "core_size")), Long.valueOf(this.threadPool.getCorePoolSize())).add((CallSite)((Object)(prefix + "maximum_pool_size")), Long.valueOf(this.threadPool.getMaximumPoolSize())).add((CallSite)((Object)(prefix + "pool_size")), Long.valueOf(this.threadPool.getPoolSize())).add((CallSite)((Object)(prefix + "active_count")), Long.valueOf(this.threadPool.getActiveCount())).add((CallSite)((Object)(prefix + "completed_task_count")), this.threadPool.getCompletedTaskCount());
    }

    private void initThreadPool() {
        this.threadPool = new ThreadPoolExecutor(this.settings.getThreadPoolMessagingCoreSize(), this.settings.getThreadPoolMessagingMaxSize(), this.settings.getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Thread.ofVirtual().name("msg-thr-", 0L).factory());
        this.threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(SingleCollectionMessaging.this.morphium.getConfig().driverSettings().getIdleSleepTime()));
                log.info("Recursion!");
                executor.execute(r);
            }
        });
    }

    @Override
    public long getPendingMessagesCount() {
        Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        q1.f((Enum)Msg.Fields.sender).ne(this.id).f("processed_by.0").notExists();
        return q1.countAll();
    }

    @Override
    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean handleChangeStreamEvent(ChangeStreamEvent evt) {
        block16: {
            if (!this.running) {
                return false;
            }
            try {
                int totalEvents;
                Object id = ((Map)evt.getDocumentKey()).get("_id");
                if (this.morphium.getDriver().getName().contains("InMem") && ((totalEvents = this.changeStreamEventsReceived.incrementAndGet()) == 1 || totalEvents % 50 == 0 || totalEvents == 200)) {
                    log.info("{}: Change stream event #{} received", (Object)this.id, (Object)totalEvents);
                }
                if (this.docIdsFromChangestream.contains(id) && log.isDebugEnabled()) {
                    log.debug("Duplicate change stream event for id: {}", id);
                }
                this.docIdsFromChangestream.add(id);
                while (this.docIdsFromChangestream.size() > 1000) {
                    this.docIdsFromChangestream.remove(0);
                }
                if (id instanceof MorphiumId) {
                    List processedBy;
                    Map<String, Object> msg = evt.getFullDocument();
                    if (msg == null) {
                        log.error("Msg is null from change stream fullDocument");
                        return this.running;
                    }
                    if (msg.get("sender").equals(this.id)) {
                        return this.running;
                    }
                    MorphiumId messageId = (MorphiumId)msg.get("_id");
                    Boolean exclusive = (Boolean)msg.get("exclusive");
                    if (exclusive != null && exclusive.booleanValue() && (processedBy = (List)msg.get("processed_by")) != null && !processedBy.isEmpty()) {
                        log.error("Got already processed exclusive message");
                        return this.running;
                    }
                    BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
                    synchronized (blockingQueue) {
                        if (this.idsInProgress.contains(messageId)) {
                            log.warn("CHANGESTREAM DUPLICATE CAUGHT: message {} already in idsInProgress", (Object)messageId);
                            return this.running;
                        }
                        ProcessingQueueElement el = new ProcessingQueueElement();
                        el.setPriority((Integer)msg.get("priority"));
                        el.setId(messageId);
                        el.setTimestamp((Long)msg.get("timestamp"));
                        if (!this.processing.contains(el)) {
                            this.processing.add(el);
                            this.idsInProgress.add(messageId);
                            log.debug("CHANGESTREAM: Queued message {} for processing", (Object)messageId);
                        } else {
                            log.warn("CHANGESTREAM DUPLICATE CAUGHT: Message {} already in processing queue", (Object)messageId);
                        }
                        break block16;
                    }
                }
                log.error("Some other id?!?!?" + id.getClass().getName());
            }
            catch (Exception e) {
                log.error("Error during event processing in changestream", (Throwable)e);
            }
        }
        return this.running;
    }

    private void initChangeStreams() {
        ArrayList<Map<String, Object>> pipeline = new ArrayList<Map<String, Object>>();
        LinkedHashMap match = new LinkedHashMap();
        LinkedHashMap<String, List<String>> in = new LinkedHashMap<String, List<String>>();
        in.put("$in", Arrays.asList("insert"));
        match.put("operationType", in);
        pipeline.add(UtilsMap.of("$match", match));
        ChangeStreamMonitor lockMonitor = new ChangeStreamMonitor(this.morphium, this.getLockCollectionName(), false, this.pause, List.of(Doc.of("$match", Doc.of("operationType", Doc.of("$eq", "delete")))));
        lockMonitor.addListener(evt -> {
            if (this.morphium.createQueryFor(Msg.class, this.getCollectionName()).f("_id").eq(evt.getDocumentKey()).countAll() != 0L) {
                this.requestPoll.incrementAndGet();
            }
            return this.running;
        });
        this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, this.getCollectionName(), false, this.pause, pipeline);
        this.changeStreamMonitor.addListener(evt -> this.handleChangeStreamEvent(evt));
        this.changeStreamMonitor.start();
        lockMonitor.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.setName("Msg " + this.id);
        if (this.statusInfoListenerEnabled) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        }
        if (this.useChangeStream) {
            log.info("Changestream init");
            this.initChangeStreams();
        }
        try {
            AtomicLong lastRun = new AtomicLong(System.currentTimeMillis());
            this.decouplePool.scheduleWithFixedDelay(() -> {
                try {
                    boolean forcePolling;
                    boolean bl = forcePolling = this.morphium.getDriver() != null && this.morphium.getDriver().getName().contains("InMem");
                    if (this.requestPoll.get() > 0 || !this.useChangeStream || forcePolling) {
                        if (forcePolling || this.requestPoll.get() > 0) {
                            log.info("Polling (forced={}, requested={})", (Object)forcePolling, (Object)(this.requestPoll.get() > 0 ? 1 : 0));
                        }
                        lastRun.set(System.currentTimeMillis());
                        this.morphium.inc(StatisticKeys.PULL);
                        StatisticValue sk = this.morphium.getStats().get((Object)StatisticKeys.PULLSKIP);
                        sk.set(sk.get() + (long)this.requestPoll.get());
                        this.requestPoll.set(0);
                        this.findMessages();
                    } else {
                        this.morphium.inc(StatisticKeys.SKIPPED_MSG_UPDATES);
                    }
                }
                catch (Throwable e) {
                    if (this.running) {
                        log.error("Unhandled exception " + e.getMessage(), e);
                    }
                    throw e;
                }
            }, this.pause, this.pause, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException ex) {
            log.error("Executor died?!?!");
        }
        while (this.running) {
            try {
                ProcessingQueueElement prEl = this.processing.poll(1000L, TimeUnit.MILLISECONDS);
                if (prEl == null) continue;
                BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
                synchronized (blockingQueue) {
                    if (!this.idsInProgress.contains(prEl.getId())) {
                        this.idsInProgress.add(prEl.getId());
                        log.debug("PROCESSING: Added {} to idsInProgress (from queue)", (Object)prEl.getId());
                    } else {
                        log.debug("PROCESSING: {} already in idsInProgress (from changestream)", (Object)prEl.getId());
                    }
                }
                ProcessingQueueElement finalPrEl = prEl;
                Runnable r = () -> {
                    boolean wasProcessed = false;
                    Msg msg = null;
                    try {
                        msg = this.morphium.findById(Msg.class, finalPrEl.getId(), this.getCollectionName());
                        if (msg == null) {
                            BlockingQueue<ProcessingQueueElement> q = this.morphium.createQueryFor(Msg.class).setReadPreferenceLevel(ReadPreferenceLevel.PRIMARY).f("_id").eq(finalPrEl.getId());
                            ((Query)((Object)q)).setCollectionName(this.getCollectionName());
                            msg = (Msg)((Query)((Object)q)).get();
                            if (msg == null) {
                                return;
                            }
                        }
                        if (!msg.isAnswer() && !this.getListenerNames().containsKey(msg.getTopic())) {
                            return;
                        }
                        if (msg.getSender().equals(this.id)) {
                            return;
                        }
                        if (msg.isExclusive() && msg.getProcessedBy() != null && msg.getProcessedBy().size() != 0) {
                            return;
                        }
                        if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                            return;
                        }
                        if (msg.getRecipients() != null && !msg.getRecipients().contains(this.id)) {
                            return;
                        }
                        if (msg.isAnswer()) {
                            Queue<Msg> answersForMessage = this.waitingForAnswers.get(msg.getInAnswerTo());
                            if (null != answersForMessage) {
                                this.updateProcessedBy(msg);
                                if (!answersForMessage.contains(msg)) {
                                    answersForMessage.add(msg);
                                }
                                this.checkDeleteAfterProcessing(msg);
                                return;
                            }
                            CallbackRequest cbr = this.waitingForCallbacks.get(msg.getInAnswerTo());
                            Msg theMessage = msg;
                            if (cbr != null) {
                                AsyncMessageCallback cb = cbr.callback;
                                Runnable cbRunnable = () -> cb.incomingMessage(theMessage);
                                this.updateProcessedBy(theMessage);
                                this.queueOrRun(cbRunnable);
                                if (cbr.theMessage.isExclusive()) {
                                    this.waitingForCallbacks.remove(msg.getInAnswerTo());
                                }
                                this.checkDeleteAfterProcessing(msg);
                                return;
                            }
                        }
                        if (!this.getListenerNames().containsKey(msg.getTopic())) {
                            return;
                        }
                        if (msg.isExclusive()) {
                            this.lockAndProcess(msg);
                            wasProcessed = true;
                        } else {
                            this.processMessage(msg);
                            wasProcessed = true;
                        }
                    }
                    finally {
                        BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
                        synchronized (blockingQueue) {
                            this.idsInProgress.remove(finalPrEl.getId());
                        }
                    }
                };
                this.queueOrRun(r);
            }
            catch (Exception exception) {}
        }
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " stopped!");
        }
        this.listenerByName.clear();
    }

    @Override
    public int getAsyncMessagesPending() {
        return this.waitingForCallbacks.size();
    }

    private void checkDeleteAfterProcessing(Msg obj) {
        if (obj.isDeleteAfterProcessing()) {
            if (obj.getDeleteAfterProcessingTime() == 0) {
                this.morphium.delete(obj, this.getCollectionName());
            } else {
                obj.setDeleteAt(new Date(System.currentTimeMillis() + (long)obj.getDeleteAfterProcessingTime()));
                this.morphium.setInEntity(obj, this.getCollectionName(), Msg.Fields.deleteAt, obj.getDeleteAt());
                if (obj.getDeleteAfterProcessingTime() > 0 && this.morphium.getDriver() instanceof InMemoryDriver) {
                    long delay = Math.max(obj.getDeleteAfterProcessingTime(), 10000);
                    CompletableFuture.runAsync(() -> {
                        try {
                            this.morphium.createQueryFor(Msg.class, this.getCollectionName()).f((Enum)Msg.Fields.msgId).eq(obj.getMsgId()).remove();
                        }
                        catch (Exception e) {
                            log.warn("Failed to remove message after processing", (Throwable)e);
                        }
                    }, CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS));
                }
                if (obj.isExclusive()) {
                    this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(obj.getMsgId()).set(MsgLock.Fields.deleteAt, (Object)obj.getDeleteAt());
                }
            }
        }
    }

    @Override
    public void pauseTopicProcessing(String name) {
        this.pauseMessages.putIfAbsent(name, System.currentTimeMillis());
    }

    @Override
    public List<String> getPausedTopics() {
        return new ArrayList<String>(this.pauseMessages.keySet());
    }

    @Override
    public Long unpauseTopicProcessing(String name) {
        if (!this.pauseMessages.containsKey(name)) {
            return 0L;
        }
        Long ret = this.pauseMessages.remove(name);
        if (ret != null) {
            ret = System.currentTimeMillis() - ret;
        }
        this.requestPoll.incrementAndGet();
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ProcessingQueueElement> getMessagesForProcessing() {
        if (!this.running) {
            return new ArrayList<ProcessingQueueElement>();
        }
        if (this.morphium.getDriver() == null) {
            log.debug(this.id + ": Morphium driver is null, messaging system is shutting down");
            return new ArrayList<ProcessingQueueElement>();
        }
        MongoCommand fnd = null;
        try {
            Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
            if (this.listenerByName.isEmpty()) {
                List<ProcessingQueueElement> list = q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.inAnswerTo).in(this.waitingForAnswers.keySet()).limit(this.windowSize).idList();
                return list;
            }
            List idsToIgnore = this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getCollectionName() + "_lck").idList();
            BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
            synchronized (blockingQueue) {
                for (ProcessingQueueElement p : this.processing) {
                    idsToIgnore.add(p.getId());
                }
                idsToIgnore.addAll(this.idsInProgress);
                if (this.morphium.getDriver().getName().contains("InMem")) {
                    log.info("POLLING DEBUG {}: processing.size={}, idsInProgress.size={}, idsToIgnore.size={}", new Object[]{this.id, this.processing.size(), this.idsInProgress.size(), idsToIgnore.size()});
                }
            }
            Query<Msg> q1 = q.q().f((Enum)Msg.Fields.exclusive).eq(true).f("processed_by.0").notExists();
            Query<Msg> q2 = q.q().f((Enum)Msg.Fields.exclusive).ne(true).f((Enum)Msg.Fields.processedBy).ne(this.id);
            q.f("_id").nin(idsToIgnore).f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id));
            Set<String> pausedMessagesKeys = this.pauseMessages.keySet();
            if (!this.pauseMessages.isEmpty()) {
                q.f((Enum)Msg.Fields.topic).nin(pausedMessagesKeys);
                q.f("name").nin(pausedMessagesKeys);
            }
            q.or(q1, q2);
            q.setLimit(this.windowSize);
            q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
            ArrayList<ProcessingQueueElement> queueElements = new ArrayList<ProcessingQueueElement>();
            int ws = this.windowSize;
            fnd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(Msg.class)));
            fnd.setDb(this.morphium.getDatabase());
            ((FindCommand)fnd).setFilter(q.toQueryObject());
            ((FindCommand)fnd).setProjection(Doc.of("_id", (Object)1, "priority", (Object)1, "timestamp", (Object)1));
            ((FindCommand)fnd).setLimit(ws);
            ((FindCommand)fnd).setBatchSize(q.getBatchSize());
            ((FindCommand)fnd).setSort(q.getSort());
            ((FindCommand)fnd).setSkip(q.getSkip());
            fnd.setColl(this.getCollectionName());
            List<Map<String, Object>> result = ((ReadMongoCommand)fnd).execute();
            fnd.releaseConnection();
            fnd = null;
            if (!result.isEmpty()) {
                for (Map<String, Object> el : result) {
                    el.putIfAbsent("priority", 100);
                    el.putIfAbsent("timestamp", System.currentTimeMillis());
                    queueElements.add(new ProcessingQueueElement((Integer)el.get("priority"), (Long)el.get("timestamp"), (MorphiumId)el.get("_id")));
                }
            }
            long totalCount = q.countAll();
            if (this.morphium.getDriver().getName().contains("InMem")) {
                log.info("POLLING RESULT {}: found {} messages, total in DB={}, idsToIgnore.size={}", new Object[]{this.id, queueElements.size(), totalCount, idsToIgnore.size()});
            }
            if (totalCount != (long)queueElements.size()) {
                log.debug("{}: Found {} messages in queue, {} total in DB, {} in idsToIgnore", new Object[]{this.id, queueElements.size(), totalCount, idsToIgnore.size()});
                this.requestPoll.incrementAndGet();
            }
            ArrayList<ProcessingQueueElement> arrayList = queueElements;
            return arrayList;
        }
        catch (Exception e) {
            if (this.running) {
                log.error(this.id + ": Error while processing", (Throwable)e);
            }
            List<ProcessingQueueElement> list = null;
            return list;
        }
        finally {
            if (fnd != null) {
                fnd.releaseConnection();
            }
        }
    }

    public void triggerCheck() {
        log.debug("Triggercheck called");
        this.requestPoll.incrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void findMessages() {
        if (!this.running) {
            return;
        }
        List<ProcessingQueueElement> messages = this.getMessagesForProcessing();
        if (messages == null) {
            return;
        }
        if (messages.size() == 0) {
            return;
        }
        for (ProcessingQueueElement el : messages) {
            BlockingQueue<ProcessingQueueElement> blockingQueue = this.processing;
            synchronized (blockingQueue) {
                if (!this.processing.contains(el) && !this.idsInProgress.contains(el.getId())) {
                    this.processing.add(el);
                }
            }
        }
    }

    private void lockAndProcess(Msg obj) {
        if (this.lockMessage(obj, this.id, obj.getDeleteAt())) {
            this.processMessage(obj);
        }
    }

    public MsgLock getLock(Msg m) {
        return this.morphium.findById(MsgLock.class, m.getMsgId(), this.getLockCollectionName());
    }

    @Override
    public String getLockCollectionName() {
        if (this.lockCollectionName == null) {
            this.lockCollectionName = this.getCollectionName() + "_lck";
        }
        return this.lockCollectionName;
    }

    public boolean lockMessage(Msg m, String lockId) {
        return this.lockMessage(m, lockId, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean lockMessage(Msg m, String lockId, Date delAt) {
        long start = System.currentTimeMillis();
        MsgLock lck = new MsgLock(m);
        lck.setLockId(lockId);
        if (delAt != null) {
            lck.setDeleteAt(delAt);
        } else {
            lck.setDeleteAt(new Date(System.currentTimeMillis() + m.getTtl() * 2L));
        }
        MongoCommand cmd = null;
        try {
            cmd = new InsertMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(MsgLock.class)));
            ((InsertMongoCommand)((InsertMongoCommand)cmd.setColl(this.getLockCollectionName())).setDb(this.morphium.getDatabase())).setDocuments(List.of(this.morphium.getMapper().serialize(lck)));
            ((InsertMongoCommand)cmd).execute();
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            boolean bl = false;
            return bl;
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMessage(Msg msg) {
        if (msg == null) {
            return;
        }
        if (msg.getSender().equals(this.getSenderId())) {
            log.error("This should have been filtered out before alreaday!!!");
            return;
        }
        if (msg.isTimingOut() && msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
            log.debug(this.getSenderId() + ": Found outdated message - deleting it!");
            this.morphium.delete(msg, this.getCollectionName());
            this.unlockIfExclusive(msg);
            return;
        }
        if (msg.isExclusive() && msg.getProcessedBy().size() > 0) {
            return;
        }
        boolean alreadyUpdatedProcessedBy = false;
        if (this.morphium.getDriver().getName().contains("InMem") && !msg.isExclusive()) {
            String lockKey = msg.getMsgId().toString().intern();
            log.debug("SYNC: Instance {} acquiring lock for message {}", (Object)this.id, (Object)msg.getMsgId());
            String string = lockKey;
            synchronized (string) {
                log.debug("SYNC: Instance {} ACQUIRED lock for message {}", (Object)this.id, (Object)msg.getMsgId());
                Msg freshMsg = this.morphium.findById(Msg.class, msg.getMsgId(), this.getCollectionName());
                if (freshMsg == null) {
                    log.debug("SYNC: Message {} was deleted, skipping", (Object)msg.getMsgId());
                    return;
                }
                if (freshMsg.getProcessedBy() != null && freshMsg.getProcessedBy().contains(this.id)) {
                    log.info("DEDUP: Duplicate PREVENTED for instance {} on message {} - already in processed_by", (Object)this.id, (Object)msg.getMsgId());
                    return;
                }
                this.updateProcessedBy(msg);
                alreadyUpdatedProcessedBy = true;
            }
            log.debug("SYNC: Instance {} RELEASED lock for message {}, proceeding to process", (Object)this.id, (Object)msg.getMsgId());
        } else if (msg.getProcessedBy().contains(this.id)) {
            return;
        }
        if (this.listenerByName.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Not further processing - no listener for non answer message");
            }
            this.unlockIfExclusive(msg);
            return;
        }
        boolean wasProcessed = false;
        boolean wasRejected = false;
        ArrayList<MessageRejectedException> rejections = new ArrayList<MessageRejectedException>();
        ArrayList lst = new ArrayList();
        if (this.listenerByName.get(msg.getTopic()) != null) {
            lst.addAll(this.listenerByName.get(msg.getTopic()));
        }
        for (MessageListener l : lst) {
            try {
                if (this.pauseMessages.containsKey(msg.getTopic())) {
                    wasProcessed = false;
                    this.unlockIfExclusive(msg);
                    break;
                }
                if (l.markAsProcessedBeforeExec() && !alreadyUpdatedProcessedBy) {
                    this.updateProcessedBy(msg);
                }
                Msg answer = l.onMessage(this, msg);
                wasProcessed = true;
                if (this.autoAnswer && answer == null) {
                    answer = new Msg(msg.getTopic(), "received", "");
                }
                if (answer == null) continue;
                msg.sendAnswer(this, answer);
                if (answer.getRecipients() != null) continue;
                log.warn("Recipient of answer is null?!?!");
            }
            catch (MessageRejectedException mre) {
                log.info(this.id + ": Message was rejected by listener: " + mre.getMessage());
                wasRejected = true;
                rejections.add(mre);
                this.requestPoll.incrementAndGet();
            }
            catch (Exception e) {
                log.error(this.id + ": listener Processing failed", (Throwable)e);
                this.checkDeleteAfterProcessing(msg);
            }
        }
        if (wasRejected) {
            for (MessageRejectedException mre : rejections) {
                if (mre.getRejectionHandler() != null) {
                    try {
                        mre.getRejectionHandler().handleRejection(this, msg);
                    }
                    catch (Exception e) {
                        log.error("Error in rejection handling", (Throwable)e);
                    }
                    continue;
                }
                log.error("No rejection  handler defined!!!");
            }
        }
        if (wasProcessed && !msg.getProcessedBy().contains(this.id) && !alreadyUpdatedProcessedBy) {
            this.updateProcessedBy(msg);
        }
        if (!wasRejected && wasProcessed) {
            this.checkDeleteAfterProcessing(msg);
        }
        this.unlockIfExclusive(msg);
    }

    private void unlockIfExclusive(Msg msg) {
        if (msg.isExclusive()) {
            this.deleteLock(msg.getMsgId());
        }
    }

    private void deleteLock(MorphiumId msgId) {
        this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getLockCollectionName()).f("_id").eq(msgId).f("lock_id").eq(this.id).remove();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean tryClaimMessageForThisInstance(Msg msg) {
        if (msg == null) {
            return false;
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return false;
        }
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        Map<String, Object> qobj = idq.toQueryObject();
        Doc set = Doc.of("processed_by", this.id);
        Doc update = Doc.of("$addToSet", set);
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, false, null, null, null);
            Map<String, Object> ret = ((WriteMongoCommand)cmd).execute();
            cmd.releaseConnection();
            cmd = null;
            Object nModified = ret.get("nModified");
            if (nModified != null && ((Number)nModified).intValue() > 0) {
                msg.getProcessedBy().add(this.id);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        catch (MorphiumDriverException e) {
            log.error("Error claiming message - will skip to avoid duplicates", (Throwable)e);
            boolean bl = false;
            return bl;
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updateProcessedBy(Msg msg) {
        if (msg == null) {
            return false;
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return true;
        }
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        Map<String, Object> qobj = idq.toQueryObject();
        Doc set = Doc.of("processed_by", this.id);
        Doc update = Doc.of("$addToSet", set);
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, false, null, null, null);
            Map<String, Object> ret = ((WriteMongoCommand)cmd).execute();
            cmd.releaseConnection();
            cmd = null;
            if (ret.get("nModified") == null && ret.get("modified") == null || Integer.valueOf(0).equals(ret.get("nModified"))) {
                if (this.morphium.reread(msg, this.getCollectionName()) != null && !msg.getProcessedBy().contains(this.id)) {
                    log.warn(this.id + ": Could not update processed_by in msg " + String.valueOf(msg.getMsgId()));
                    log.warn(this.id + ": " + Utils.toJsonString(ret));
                    log.warn(this.id + ": msg: " + msg.toString());
                    boolean bl = false;
                    return bl;
                }
                boolean bl = true;
                return bl;
            }
            msg.getProcessedBy().add(this.id);
            boolean bl = true;
            return bl;
        }
        catch (MorphiumDriverException e) {
            log.error("Error updating processed by - this might lead to duplicate execution!", (Throwable)e);
            boolean bl = false;
            return bl;
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            try {
                this.threadPool.execute(r);
            }
            catch (Throwable ignored) {
                ignored.printStackTrace();
            }
        } else {
            r.run();
        }
    }

    @Override
    public String getCollectionName() {
        if (this.collectionName == null) {
            this.collectionName = this.queueName == null || this.queueName.isEmpty() ? "msg" : "mmsg_" + this.queueName;
        }
        return this.collectionName;
    }

    @Override
    public String getCollectionName(String msgName) {
        return this.getCollectionName();
    }

    public String getCollectionName(Msg msg) {
        return this.getCollectionName();
    }

    @Override
    public void addListenerForTopic(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        if (this.listenerByName.get(n).contains(l)) {
            log.error("cowardly refusing to add already registered listener for name " + n);
        } else {
            this.listenerByName.get(n).add(l);
        }
        this.requestPoll.incrementAndGet();
    }

    @Override
    public void removeListenerForTopic(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        if (((List)c.get(n)).isEmpty()) {
            c.remove(n);
        }
        this.listenerByName = c;
    }

    @Override
    public String getSenderId() {
        return this.id;
    }

    @Override
    public SingleCollectionMessaging setSenderId(String id) {
        this.id = id;
        return this;
    }

    @Override
    public int getPause() {
        return this.pause;
    }

    @Override
    public SingleCollectionMessaging setPause(int pause) {
        this.pause = pause;
        return this;
    }

    @Override
    public boolean isRunning() {
        if (this.useChangeStream) {
            return this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning();
        }
        return this.running;
    }

    @Override
    public void close() {
        this.terminate();
    }

    @Override
    public void terminate() {
        int sz;
        log.info("Terminate messaging");
        this.running = false;
        this.listenerByName.clear();
        this.waitingForAnswers.clear();
        this.processing.clear();
        this.requestPoll.set(0);
        allMessagings.remove(this);
        if (this.decouplePool != null) {
            try {
                sz = this.decouplePool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still scheduled");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down decouple-pool", (Throwable)e);
            }
        }
        this.morphium.removeShutdownListener(this);
        if (this.threadPool != null) {
            try {
                sz = this.threadPool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still pending in pool");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down threadpool");
            }
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.terminate();
        }
    }

    @Override
    public void queueMessage(Msg m) {
        if (this.morphium.getDriver().getName().equals("SingleMongoConnectDriver")) {
            this.storeMsg(m, false);
        } else {
            this.storeMsg(m, true);
        }
    }

    @Override
    public void sendMessage(Msg m) {
        this.storeMsg(m, false);
    }

    @Override
    public long getNumberOfMessages() {
        return this.getPendingMessagesCount();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(this){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    @Override
    public void sendMessageToSelf(Msg m) {
        this.sendMessageToSelf(m, false);
    }

    @Override
    public void queueMessagetoSelf(Msg m) {
        this.sendMessageToSelf(m, true);
    }

    private void sendMessageToSelf(Msg m, boolean async) {
        AsyncCallbackAdapter cb = null;
        if (async) {
            cb = new AsyncCallbackAdapter();
        }
        m.setSender("self");
        m.addRecipient(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    @Override
    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    @Override
    public MorphiumMessaging setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
        return this;
    }

    @Override
    public void onShutdown(Morphium m) {
        log.warn("Got shutdown event...");
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdown();
                LockSupport.parkNanos(2000000L);
                if (this.threadPool != null) {
                    this.threadPool.shutdownNow();
                }
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.terminate();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) {
        return this.sendAndAwaitFirstAnswer(theMessage, timeoutInMs, true);
    }

    @Override
    public <T extends Msg> void sendAndAwaitAsync(T theMessage, long timeoutInMs, AsyncMessageCallback cb) {
        if (!this.running) {
            throw new SystemShutdownException("Messaging shutting down - abort sending!");
        }
        if (theMessage.getMsgId() == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        MorphiumId requestMsgId = theMessage.getMsgId();
        CallbackRequest cbr = new CallbackRequest(this);
        cbr.timestamp = System.currentTimeMillis();
        cbr.theMessage = theMessage;
        cbr.callback = cb;
        cbr.ttl = timeoutInMs;
        this.waitingForCallbacks.put(requestMsgId, cbr);
        this.sendMessage(theMessage);
        this.decouplePool.schedule(() -> this.waitingForCallbacks.remove(requestMsgId), timeoutInMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs, boolean throwExceptionOnTimeout) {
        if (!this.running) {
            throw new SystemShutdownException("Messaging shutting down - abort sending!");
        }
        if (theMessage.getMsgId() == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        MorphiumId requestMsgId = theMessage.getMsgId();
        LinkedBlockingDeque blockingQueue = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, blockingQueue);
        try {
            this.sendMessage(theMessage);
            Msg firstAnswer = (Msg)blockingQueue.poll(timeoutInMs, TimeUnit.MILLISECONDS);
            if (null == firstAnswer && throwExceptionOnTimeout) {
                throw new MessageTimeoutException("Did not receive answer for message " + theMessage.getTopic() + "/" + String.valueOf(requestMsgId) + " in time (" + timeoutInMs + "ms)");
            }
            Msg msg = firstAnswer;
            return (T)msg;
        }
        catch (InterruptedException e) {
            log.error("Did not receive answer for message " + theMessage.getTopic() + "/" + String.valueOf(requestMsgId) + " interrupted.", (Throwable)e);
        }
        finally {
            this.waitingForAnswers.remove(requestMsgId);
        }
        return null;
    }

    @Override
    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout) {
        return this.sendAndAwaitAnswers(theMessage, numberOfAnswers, timeout, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout, boolean throwExceptionOnTimeout) {
        MorphiumId requestMsgId = theMessage.getMsgId();
        if (requestMsgId == null) {
            theMessage.setMsgId(new MorphiumId());
            requestMsgId = theMessage.getMsgId();
        }
        LinkedBlockingDeque answerList = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, answerList);
        this.sendMessage(theMessage);
        long start = System.currentTimeMillis();
        ArrayList returnValue = null;
        try {
            while (this.running) {
                if (answerList.size() > 0 && numberOfAnswers > 0 && answerList.size() >= numberOfAnswers) {
                    break;
                }
                if (throwExceptionOnTimeout && System.currentTimeMillis() - start > timeout && answerList.isEmpty()) {
                    throw new MessageTimeoutException("Did not receive any answer for message " + theMessage.getTopic() + "/" + String.valueOf(requestMsgId) + "in time (" + timeout + ")");
                }
                if (System.currentTimeMillis() - start > timeout) {
                    break;
                }
                if (!this.running) {
                    throw new SystemShutdownException("Messaging shutting down - abort waiting!");
                }
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(this.morphium.getConfig().driverSettings().getIdleSleepTime()));
            }
        }
        finally {
            returnValue = new ArrayList(this.waitingForAnswers.remove(requestMsgId));
        }
        return returnValue;
    }

    @Override
    public boolean isProcessMultiple() {
        return this.windowSize == 1;
    }

    @Override
    @Deprecated
    public MorphiumMessaging setProcessMultiple(boolean processMultiple) {
        this.windowSize = processMultiple ? 10 : 1;
        return this;
    }

    @Override
    public String getQueueName() {
        return this.queueName;
    }

    @Override
    public MorphiumMessaging setQueueName(String queueName) {
        this.queueName = queueName;
        this.collectionName = null;
        this.lockCollectionName = null;
        return this;
    }

    @Override
    public boolean isMultithreadded() {
        return this.multithreadded;
    }

    @Override
    public MorphiumMessaging setMultithreadded(boolean multithreadded) {
        if (!multithreadded && this.threadPool != null) {
            this.threadPool.shutdownNow();
            this.threadPool = null;
        } else if (multithreadded && this.threadPool == null) {
            this.initThreadPool();
        }
        this.multithreadded = multithreadded;
        return this;
    }

    @Override
    public int getWindowSize() {
        return this.windowSize;
    }

    @Override
    public MorphiumMessaging setWindowSize(int windowSize) {
        this.windowSize = windowSize;
        return this;
    }

    @Override
    public boolean isUseChangeStream() {
        return this.useChangeStream;
    }

    @Override
    public int getRunningTasks() {
        if (this.threadPool != null) {
            return this.threadPool.getActiveCount();
        }
        return 0;
    }

    @Override
    public Morphium getMorphium() {
        return this.morphium;
    }

    @Override
    public MorphiumMessaging setPolling(boolean doPolling) {
        this.useChangeStream = !doPolling;
        return this;
    }

    @Override
    public MorphiumMessaging setUseChangeStream(boolean useChangeStream) {
        this.useChangeStream = useChangeStream;
        return this;
    }

    @Override
    public <T extends Msg> String getLockCollectionName(T topic) {
        return this.getLockCollectionName();
    }

    @Override
    public String getLockCollectionName(String topic) {
        return this.getLockCollectionName();
    }

    @Override
    public String getDMCollectionName(String sender) {
        return this.getCollectionName();
    }

    public static class ProcessingQueueElement
    implements Comparable<ProcessingQueueElement> {
        private int priority;
        private MorphiumId id;
        private long timestamp;

        public ProcessingQueueElement() {
        }

        public ProcessingQueueElement(int priority, long timestamp, MorphiumId id) {
            this.priority = priority;
            this.id = id;
            this.timestamp = timestamp;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public ProcessingQueueElement setTimestamp(long timestamp) {
            this.timestamp = timestamp;
            return this;
        }

        public int getPriority() {
            return this.priority;
        }

        public ProcessingQueueElement setPriority(int priority) {
            this.priority = priority;
            return this;
        }

        public MorphiumId getId() {
            return this.id;
        }

        public ProcessingQueueElement setId(MorphiumId id) {
            this.id = id;
            return this;
        }

        @Override
        public int compareTo(ProcessingQueueElement o) {
            if (o.getPriority() < this.priority) {
                return 1;
            }
            if (o.getPriority() > this.priority) {
                return -1;
            }
            if (o.getTimestamp() < this.timestamp) {
                return 1;
            }
            if (o.getTimestamp() > this.timestamp) {
                return -1;
            }
            return o.getId().compareTo(this.id);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProcessingQueueElement that = (ProcessingQueueElement)o;
            return Objects.equals(this.id, that.id);
        }

        public int hashCode() {
            return Objects.hash(this.id);
        }
    }

    public static class SystemShutdownException
    extends RuntimeException {
        public SystemShutdownException(String msg) {
            super(msg);
        }
    }

    private class CallbackRequest {
        Msg theMessage;
        AsyncMessageCallback callback;
        long ttl;
        long timestamp;

        private CallbackRequest(SingleCollectionMessaging singleCollectionMessaging) {
        }
    }

    public static interface AsyncMessageCallback {
        public void incomingMessage(Msg var1);
    }

    public static class MessageTimeoutException
    extends RuntimeException {
        public MessageTimeoutException(String msg) {
            super(msg);
        }
    }
}

