/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.StatisticKeys;
import de.caluga.morphium.StatisticValue;
import de.caluga.morphium.Utils;
import de.caluga.morphium.UtilsMap;
import de.caluga.morphium.async.AsyncCallbackAdapter;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.MorphiumDriverException;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.driver.commands.FindCommand;
import de.caluga.morphium.driver.commands.InsertMongoCommand;
import de.caluga.morphium.driver.commands.MongoCommand;
import de.caluga.morphium.driver.commands.ReadMongoCommand;
import de.caluga.morphium.driver.commands.UpdateMongoCommand;
import de.caluga.morphium.driver.commands.WriteMongoCommand;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.MsgLock;
import de.caluga.morphium.messaging.RemoveProcessTask;
import de.caluga.morphium.messaging.StatusInfoListener;
import de.caluga.morphium.query.Query;
import java.lang.invoke.CallSite;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Messaging
extends Thread
implements ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(Messaging.class);
    private final StatusInfoListener statusInfoListener;
    private String statusInfoListenerName = "morphium.status_info";
    private boolean statusInfoListenerEnabled = true;
    private final Morphium morphium;
    private boolean running;
    private int pause;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private boolean processMultiple;
    private final List<MessageListener> listeners;
    private final Map<String, Long> pauseMessages = new ConcurrentHashMap<String, Long>();
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private String lockCollectionName = null;
    private String collectionName = null;
    private ThreadPoolExecutor threadPool;
    private final ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded;
    private int windowSize;
    private boolean useChangeStream;
    private ChangeStreamMonitor changeStreamMonitor;
    private final Map<MorphiumId, Queue<Msg>> waitingForAnswers = new ConcurrentHashMap<MorphiumId, Queue<Msg>>();
    private final Set<MorphiumId> processing = ConcurrentHashMap.newKeySet();
    private final AtomicInteger skipped = new AtomicInteger(0);

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
    }

    public Messaging(Morphium m) {
        this(m, null, 500, false, false, 100);
    }

    public Messaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, processMultiple, multithreadded, windowSize);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, processMultiple, false, m.getConfig().getMessagingWindowSize());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, m.isReplicaSet() || m.getDriver().getName().equals("InMemDriver"));
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this.setWindowSize(windowSize);
        this.setUseChangeStream(useChangeStream);
        this.setQueueName(queueName);
        this.setPause(pause);
        this.setProcessMultiple(processMultiple);
        this.morphium = m;
        this.statusInfoListener = new StatusInfoListener();
        this.statusInfoListenerEnabled = m.getConfig().isMessagingStatusInfoListenerEnabled();
        if (m.getConfig().getMessagingStatusInfoListenerName() != null) {
            this.statusInfoListenerName = m.getConfig().getMessagingStatusInfoListenerName();
        }
        this.setMultithreadded(multithreadded);
        this.decouplePool = new ScheduledThreadPoolExecutor(1);
        this.decouplePool.setThreadFactory(new ThreadFactory(){
            private final AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "decouple_thr_" + String.valueOf(this.num));
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
        this.morphium.addShutdownListener(this);
        this.running = true;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        m.ensureIndicesFor(Msg.class, this.getCollectionName());
        m.ensureIndicesFor(MsgLock.class, this.getLockCollectionName());
        this.listeners = new CopyOnWriteArrayList<MessageListener>();
        this.listenerByName = new HashMap<String, List<MessageListener>>();
        this.skipped.set(1);
    }

    public void enableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(true);
    }

    public void disableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(false);
    }

    public String getStatusInfoListenerName() {
        return this.statusInfoListenerName;
    }

    public void setStatusInfoListenerName(String statusInfoListenerName) {
        this.listenerByName.remove(this.statusInfoListenerName);
        this.statusInfoListenerName = statusInfoListenerName;
        this.listenerByName.put(statusInfoListenerName, Arrays.asList(this.statusInfoListener));
    }

    public boolean isStatusInfoListenerEnabled() {
        return this.statusInfoListenerEnabled;
    }

    public void setStatusInfoListenerEnabled(boolean statusInfoListenerEnabled) {
        this.statusInfoListenerEnabled = statusInfoListenerEnabled;
        if (statusInfoListenerEnabled && !this.listenerByName.containsKey(this.statusInfoListenerName)) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        } else if (!statusInfoListenerEnabled) {
            this.listenerByName.remove(this.statusInfoListenerName);
        }
    }

    public Map<String, List<String>> getListenerNames() {
        HashMap<String, List<String>> ret = new HashMap<String, List<String>>();
        HashMap<String, List<MessageListener>> localCopy = new HashMap<String, List<MessageListener>>(this.listenerByName);
        for (Map.Entry e : localCopy.entrySet()) {
            ArrayList<String> classes = new ArrayList<String>();
            for (MessageListener lst : (List)e.getValue()) {
                classes.add(lst.getClass().getName());
            }
            ret.put((String)e.getKey(), classes);
        }
        return ret;
    }

    public List<String> getGlobalListeners() {
        ArrayList<MessageListener> localCopy = new ArrayList<MessageListener>(this.listeners);
        ArrayList<String> ret = new ArrayList<String>();
        for (MessageListener lst : localCopy) {
            ret.add(lst.getClass().getName());
        }
        return ret;
    }

    public Map<String, Long> getThreadPoolStats() {
        String prefix = "messaging.threadpool.";
        return UtilsMap.of(prefix + "largest_poolsize", Long.valueOf(this.threadPool.getLargestPoolSize())).add((CallSite)((Object)(prefix + "task_count")), this.threadPool.getTaskCount()).add((CallSite)((Object)(prefix + "core_size")), Long.valueOf(this.threadPool.getCorePoolSize())).add((CallSite)((Object)(prefix + "maximum_pool_size")), Long.valueOf(this.threadPool.getMaximumPoolSize())).add((CallSite)((Object)(prefix + "pool_size")), Long.valueOf(this.threadPool.getPoolSize())).add((CallSite)((Object)(prefix + "active_count")), Long.valueOf(this.threadPool.getActiveCount())).add((CallSite)((Object)(prefix + "completed_task_count")), this.threadPool.getCompletedTaskCount());
    }

    private void initThreadPool() {
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(){

            @Override
            public boolean offer(Runnable e) {
                int maximumPoolSize;
                int poolSize = Messaging.this.threadPool.getPoolSize();
                if (poolSize >= (maximumPoolSize = Messaging.this.threadPool.getMaximumPoolSize()) || poolSize > Messaging.this.threadPool.getActiveCount()) {
                    return super.offer(e);
                }
                return false;
            }
        };
        this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue);
        this.threadPool.setRejectedExecutionHandler((r, executor) -> {
            try {
                executor.getQueue().put(r);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        this.threadPool.setThreadFactory(new ThreadFactory(){
            private final AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "messaging " + String.valueOf(this.num));
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
    }

    public long getPendingMessagesCount() {
        Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        q1.f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.processedBy).eq(null);
        return q1.countAll();
    }

    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Object pipeline2;
        this.setName("Msg " + this.id);
        if (this.statusInfoListenerEnabled) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        }
        if (this.useChangeStream) {
            try {
                this.findAndProcessMessages(this.processMultiple);
                if (this.multithreadded) {
                    while (this.threadPool != null && this.threadPool.getActiveCount() > 0) {
                        Thread.sleep(this.morphium.getConfig().getIdleSleepTime());
                    }
                }
            }
            catch (Exception e) {
                log.error("Error processing existing messages in queue", (Throwable)e);
            }
            pipeline2 = new ArrayList();
            LinkedHashMap match = new LinkedHashMap();
            LinkedHashMap<String, List<String>> in = new LinkedHashMap<String, List<String>>();
            in.put("$in", Arrays.asList("insert", "delete", "update"));
            match.put("operationType", in);
            pipeline2.add(UtilsMap.of("$match", match));
            ChangeStreamMonitor lockMonitor = new ChangeStreamMonitor(this.morphium, this.getLockCollectionName(), true, this.pause, List.of(Doc.of("$match", Doc.of("operationType", Doc.of("$eq", "delete")))));
            lockMonitor.addListener(evt -> {
                this.skipped.incrementAndGet();
                return this.running;
            });
            this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, this.getCollectionName(), true, this.pause, (List<Map<String, Object>>)pipeline2);
            this.changeStreamMonitor.addListener(evt -> {
                if (!this.running) {
                    return false;
                }
                Messaging messaging = this;
                synchronized (messaging) {
                    block156: {
                        try {
                            Msg obj;
                            if (evt == null || evt.getOperationType() == null) {
                                boolean bl = this.running;
                                return bl;
                            }
                            if (evt.getOperationType().equals("insert")) {
                                obj = this.morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
                                if (obj.isExclusive() && obj.getProcessedBy().size() != 0) {
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (this.processing.contains(obj.getMsgId())) {
                                    boolean bl = this.running;
                                    return bl;
                                }
                                this.processing.add(obj.getMsgId());
                                if (obj.getRecipients() != null && !obj.getRecipients().contains(this.getSenderId())) {
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (obj.getSender().equals(this.id)) {
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (obj.getInAnswerTo() != null) {
                                    if (obj.isExclusive() && !this.lockMessage(obj, this.id)) {
                                        this.removeProcessingFor(obj);
                                        boolean bl = this.running;
                                        return bl;
                                    }
                                    this.handleAnswer(obj);
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (this.listenerByName.get(obj.getName()) == null && this.listeners.size() == 0) {
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (this.pauseMessages.containsKey(obj.getName())) {
                                    this.processing.remove(obj.getMsgId());
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (obj.getProcessedBy().contains(this.id)) {
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                if (obj.getSender().equals(this.id) || obj.getProcessedBy().contains(this.id) || obj.getRecipients() != null && !obj.getRecipients().contains(this.id)) {
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                try {
                                    if (obj.isExclusive() && (obj.getRecipients() == null || obj.getRecipients().contains(this.id)) && obj.getProcessedBy().size() == 0) {
                                        this.lockAndProcess(obj);
                                        break block156;
                                    } else {
                                        if (obj.isExclusive() && (obj.getRecipients() == null || !obj.getRecipients().contains(this.id))) return this.running;
                                        this.processMessage(obj);
                                    }
                                }
                                catch (Exception e) {
                                    log.error("Error during message processing ", (Throwable)e);
                                }
                                break block156;
                            }
                            if (evt.getOperationType().equals("delete")) {
                                MsgLock x = this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(((Map)evt.getDocumentKey()).get("_id")).get();
                                if (x != null) {
                                    this.morphium.delete(x, this.getLockCollectionName());
                                }
                                boolean bl = this.running;
                                return bl;
                            }
                            if (!evt.getOperationType().equals("update")) return this.running;
                            obj = this.morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
                            if (obj == null) {
                                boolean bl = this.running;
                                return bl;
                            }
                            if (obj.isExclusive() && obj.getProcessedBy().size() > 0) {
                                boolean bl = this.running;
                                return bl;
                            }
                            if (obj.getProcessedBy().contains(this.id)) {
                                boolean bl = this.running;
                                return bl;
                            }
                            if (this.processing.contains(obj.getMsgId())) {
                                boolean bl = this.running;
                                return bl;
                            }
                            this.processing.add(obj.getMsgId());
                            if (obj.getSender().equals(this.id) || obj.getRecipients() != null && !obj.getRecipients().contains(this.id)) {
                                this.removeProcessingFor(obj);
                                boolean bl = this.running;
                                return bl;
                            }
                            if (obj.getInAnswerTo() != null) {
                                if (obj.isExclusive() && !this.lockMessage(obj, this.id)) {
                                    this.removeProcessingFor(obj);
                                    boolean bl = this.running;
                                    return bl;
                                }
                                this.handleAnswer(obj);
                                this.removeProcessingFor(obj);
                                boolean bl = this.running;
                                return bl;
                            }
                            if (this.listenerByName.get(obj.getName()) == null && this.listeners.size() == 0) {
                                this.removeProcessingFor(obj);
                                boolean bl = this.running;
                                return bl;
                            }
                            if (this.pauseMessages.containsKey(obj.getName())) {
                                this.processing.remove(obj.getMsgId());
                                boolean bl = this.running;
                                return bl;
                            }
                            if (obj.isExclusive()) {
                                this.lockAndProcess(obj);
                            } else {
                                this.processMessage(obj);
                            }
                        }
                        catch (Exception e) {
                            log.error("Error during event processing in changestream", (Throwable)e);
                        }
                        finally {
                            while (true) {
                                try {
                                    this.decouplePool.schedule(() -> this.triggerCheck(), 5000L, TimeUnit.MILLISECONDS);
                                }
                                catch (Exception exception) {
                                    try {
                                        Thread.sleep(1000L);
                                    }
                                    catch (InterruptedException interruptedException) {}
                                    continue;
                                }
                                break;
                            }
                        }
                    }
                    return this.running;
                }
            });
            this.changeStreamMonitor.start();
            lockMonitor.start();
        }
        this.skipped.incrementAndGet();
        while (this.running) {
            try {
                if (this.skipped.get() > 0 || !this.useChangeStream) {
                    pipeline2 = this;
                    synchronized (pipeline2) {
                        this.morphium.inc(StatisticKeys.PULL);
                        StatisticValue sk = this.morphium.getStats().get((Object)StatisticKeys.PULLSKIP);
                        sk.set(sk.get() + (long)this.skipped.get());
                        this.skipped.set(0);
                        this.findAndProcessMessages(this.processMultiple);
                        continue;
                    }
                }
                this.morphium.inc(StatisticKeys.SKIPPED_MSG_UPDATES);
            }
            catch (Throwable e) {
                if (!this.running) break;
                log.error("Unhandled exception " + e.getMessage(), e);
            }
            finally {
                try {
                    Thread.sleep((long)((double)this.pause / 2.0 * Math.random() + (double)this.pause * 0.75));
                }
                catch (InterruptedException pipeline2) {}
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " stopped!");
        }
        if (!this.running) {
            this.listeners.clear();
            this.listenerByName.clear();
        }
    }

    private void handleAnswer(Msg obj) {
        Queue<Msg> answers = this.waitingForAnswers.get(obj.getInAnswerTo());
        if (null != answers) {
            this.updateProcessedBy(obj);
            if (!answers.contains(obj)) {
                answers.add(obj);
            }
            if (obj.isDeleteAfterProcessing()) {
                if (obj.getDeleteAfterProcessingTime() == 0) {
                    this.morphium.delete(obj, this.getCollectionName());
                } else {
                    obj.setDeleteAt(new Date(System.currentTimeMillis() + (long)obj.getDeleteAfterProcessingTime()));
                    this.morphium.set(obj, this.getCollectionName(), Msg.Fields.deleteAt, obj.getDeleteAt());
                    if (obj.isExclusive()) {
                        this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(obj.getMsgId()).set(MsgLock.Fields.deleteAt, (Object)obj.getDeleteAt());
                    }
                }
                this.removeProcessingFor(obj);
            }
        } else if (obj.isExclusive()) {
            this.lockAndProcess(obj);
        } else {
            this.processMessage(obj);
        }
    }

    public void pauseProcessingOfMessagesNamed(String name) {
        this.pauseMessages.putIfAbsent(name, System.currentTimeMillis());
    }

    public List<String> getPausedMessageNames() {
        return new ArrayList<String>(this.pauseMessages.keySet());
    }

    public Long unpauseProcessingOfMessagesNamed(String name) {
        this.skipped.incrementAndGet();
        if (!this.pauseMessages.containsKey(name)) {
            return 0L;
        }
        Long ret = this.pauseMessages.remove(name);
        if (ret != null) {
            ret = System.currentTimeMillis() - ret;
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<MorphiumId> getMessagesForProcessing(boolean multiple) {
        if (!this.running) {
            return new ArrayList<MorphiumId>();
        }
        Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        if (this.listenerByName.isEmpty() && this.listeners.isEmpty()) {
            return q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.inAnswerTo).in(this.waitingForAnswers.keySet()).limit(this.windowSize).idList();
        }
        List preLockedIds = this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getCollectionName() + "_lck").idList();
        preLockedIds.addAll(this.processing);
        Query<Msg> q1 = q.q().f("_id").nin(preLockedIds).f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id)).f((Enum)Msg.Fields.exclusive).eq(true).f("processed_by.0").notExists();
        Query<Msg> q2 = q.q().f("_id").nin(this.processing).f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id)).f((Enum)Msg.Fields.exclusive).ne(true).f((Enum)Msg.Fields.processedBy).ne(this.id);
        q.or(q1, q2);
        Set<String> pausedMessagesKeys = this.pauseMessages.keySet();
        if (!this.pauseMessages.isEmpty()) {
            q.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
        }
        q.setLimit(this.windowSize);
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        ArrayList<MorphiumId> lockedIds = new ArrayList<MorphiumId>();
        MongoCommand fnd = null;
        try {
            int ws = this.windowSize;
            if (!multiple) {
                ws = 1;
            }
            fnd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(Msg.class)));
            fnd.setDb(this.morphium.getDatabase());
            ((FindCommand)fnd).setFilter(q.toQueryObject());
            ((FindCommand)fnd).setProjection(Doc.of("_id", (Object)1, "ttl", (Object)1, "timing_out", (Object)1, "exclusive", (Object)1, "processed_by", (Object)1));
            ((FindCommand)fnd).setLimit(ws);
            ((FindCommand)fnd).setBatchSize(q.getBatchSize());
            ((FindCommand)fnd).setSort(q.getSort());
            ((FindCommand)fnd).setSkip(q.getSkip());
            fnd.setColl(this.getCollectionName());
            List<Map<String, Object>> result = ((ReadMongoCommand)fnd).execute();
            fnd.releaseConnection();
            fnd = null;
            if (!result.isEmpty()) {
                for (Map<String, Object> el : result) {
                    if (this.processing.contains(el.get("_id"))) continue;
                    if (el.get("exclusive") == null || el.get("exclusive").equals(Boolean.FALSE)) {
                        lockedIds.add((MorphiumId)el.get("_id"));
                        continue;
                    }
                    if (el.get("processed_by") != null && ((List)el.get("processed_by")).size() > 0) {
                        log.error("Should not get a processed exclusive message");
                        continue;
                    }
                    MsgLock l = this.morphium.findById(MsgLock.class, el.get("_id"), this.getLockCollectionName());
                    if (l != null && l.getLockId().equals(this.id)) {
                        lockedIds.add((MorphiumId)el.get("_id"));
                        continue;
                    }
                    l = new MsgLock((MorphiumId)el.get("_id"));
                    l.setLockId(this.id);
                    if (el.containsKey("timing_out") && el.get("timing_out").equals(Boolean.TRUE)) {
                        Long ttl = (Long)el.get("ttl");
                        l.setDeleteAt(new Date(System.currentTimeMillis() + ttl));
                    }
                    try {
                        this.morphium.insert(l, this.getCollectionName() + "_lck", null);
                        lockedIds.add(l.getId());
                    }
                    catch (Exception exception) {}
                }
            }
            if (q.countAll() != (long)lockedIds.size()) {
                this.skipped.incrementAndGet();
            }
            ArrayList<MorphiumId> arrayList = lockedIds;
            return arrayList;
        }
        catch (Exception e) {
            log.error("Error while processing", (Throwable)e);
            List<MorphiumId> list = null;
            return list;
        }
        finally {
            if (fnd != null) {
                fnd.releaseConnection();
            }
        }
    }

    public void triggerCheck() {
        this.skipped.incrementAndGet();
    }

    private void findAndProcessMessages(boolean multiple) {
        if (!this.running) {
            return;
        }
        List<MorphiumId> messages = this.getMessagesForProcessing(multiple);
        if (messages == null) {
            return;
        }
        if (messages.size() == 0) {
            return;
        }
        this.processMessages(messages);
    }

    private void lockAndProcess(Msg obj) {
        if (!this.lockMessage(obj, this.id)) {
            this.processing.remove(obj.getMsgId());
            this.skipped.incrementAndGet();
            return;
        }
        this.processMessage(obj);
    }

    public MsgLock getLock(Msg m) {
        return this.morphium.findById(MsgLock.class, m.getMsgId(), this.getLockCollectionName());
    }

    public String getLockCollectionName() {
        if (this.lockCollectionName == null) {
            this.lockCollectionName = this.getCollectionName() + "_lck";
        }
        return this.lockCollectionName;
    }

    public boolean lockMessage(Msg m, String lockId) {
        return this.lockMessage(m, lockId, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean lockMessage(Msg m, String lockId, Date delAt) {
        long start = System.currentTimeMillis();
        MsgLock lck = new MsgLock(m);
        lck.setLockId(lockId);
        if (delAt != null) {
            lck.setDeleteAt(delAt);
        }
        MongoCommand cmd = null;
        try {
            cmd = new InsertMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(MsgLock.class)));
            ((InsertMongoCommand)((InsertMongoCommand)cmd.setColl(this.getLockCollectionName())).setDb(this.morphium.getDatabase())).setDocuments(List.of(this.morphium.getMapper().serialize(lck)));
            ((InsertMongoCommand)cmd).execute();
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            boolean bl = false;
            return bl;
        }
        finally {
            long dur = System.currentTimeMillis() - start;
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    private synchronized void processMessage(Msg ms) {
        if (ms == null) {
            return;
        }
        Msg msg = this.morphium.reread(ms, this.getCollectionName());
        if (msg == null) {
            if (ms.isExclusive()) {
                this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(ms.getMsgId()).remove();
            }
            this.processing.remove(ms.getMsgId());
            return;
        }
        if (msg.getSender().equals(this.getSenderId())) {
            log.error("This should have been filtered out before alreaday!!!");
            this.removeProcessingFor(msg);
            return;
        }
        if (msg.isTimingOut() && msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
            log.debug(this.getSenderId() + ": Found outdated message - deleting it!");
            this.morphium.delete(msg, this.getCollectionName());
            this.processing.remove(msg.getMsgId());
            return;
        }
        if (msg.isExclusive() && msg.getProcessedBy().size() > 0) {
            this.removeProcessingFor(msg);
            return;
        }
        if (msg.getInAnswerTo() == null && msg.getProcessedBy().contains(this.id)) {
            this.removeProcessingFor(msg);
            return;
        }
        if (this.listeners.isEmpty() && this.listenerByName.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Not further processing - no listener for non answer message");
            }
            this.removeProcessingFor(msg);
            this.unlockIfExclusive(msg);
            return;
        }
        Runnable r = () -> {
            boolean wasProcessed = false;
            boolean wasRejected = false;
            ArrayList<MessageRejectedException> rejections = new ArrayList<MessageRejectedException>();
            ArrayList<MessageListener> lst = new ArrayList<MessageListener>(this.listeners);
            if (this.listenerByName.get(msg.getName()) != null) {
                lst.addAll((Collection<MessageListener>)this.listenerByName.get(msg.getName()));
            }
            if (lst.isEmpty()) {
                if (log.isDebugEnabled() && !msg.isAnswer()) {
                    log.debug(this.getSenderId() + ": Message did not have a listener registered: " + msg.getName());
                }
                this.unlockIfExclusive(msg);
                wasProcessed = false;
            }
            for (MessageListener l : lst) {
                try {
                    if (this.pauseMessages.containsKey(msg.getName())) {
                        this.processing.remove(msg.getMsgId());
                        wasProcessed = false;
                        this.skipped.incrementAndGet();
                        this.unlockIfExclusive(msg);
                        break;
                    }
                    if (l.markAsProcessedBeforeExec()) {
                        this.updateProcessedBy(msg);
                    }
                    Msg answer = l.onMessage(this, msg);
                    wasProcessed = true;
                    if (this.autoAnswer && answer == null) {
                        answer = new Msg(msg.getName(), "received", "");
                    }
                    if (answer == null) continue;
                    msg.sendAnswer(this, answer);
                    if (answer.getRecipients() != null) continue;
                    log.warn("Recipient of answer is null?!?!");
                }
                catch (MessageRejectedException mre) {
                    log.info(this.id + ": Message was rejected by listener: " + mre.getMessage());
                    wasRejected = true;
                    rejections.add(mre);
                    this.skipped.incrementAndGet();
                }
                catch (Exception e) {
                    log.error(this.id + ": listener Processing failed", (Throwable)e);
                    if (!msg.isDeleteAfterProcessing()) continue;
                    if (msg.getDeleteAfterProcessingTime() == 0) {
                        this.morphium.delete(msg, this.getCollectionName());
                        continue;
                    }
                    msg.setDeleteAt(new Date(System.currentTimeMillis() + (long)msg.getDeleteAfterProcessingTime()));
                    this.morphium.set(msg, this.getCollectionName(), Msg.Fields.deleteAt, msg.getDeleteAt());
                    if (!msg.isExclusive()) continue;
                    this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(msg.getMsgId()).set(MsgLock.Fields.deleteAt, (Object)msg.getDeleteAt());
                }
            }
            if (wasRejected) {
                for (MessageRejectedException mre : rejections) {
                    if (mre.getRejectionHandler() != null) {
                        try {
                            mre.getRejectionHandler().handleRejection(this, msg);
                        }
                        catch (Exception e) {
                            log.error("Error in rejection handling", (Throwable)e);
                        }
                        continue;
                    }
                    log.error("No rejection  handler defined!!!");
                }
            } else if (wasProcessed || !wasRejected) {
                // empty if block
            }
            if (wasProcessed && !msg.getProcessedBy().contains(this.id)) {
                this.updateProcessedBy(msg);
            }
            this.removeProcessingFor(msg);
            if (!wasRejected && wasProcessed && msg.isDeleteAfterProcessing()) {
                if (msg.getDeleteAfterProcessingTime() == 0) {
                    this.morphium.delete(msg, this.getCollectionName());
                } else {
                    msg.setDeleteAt(new Date(System.currentTimeMillis() + (long)msg.getDeleteAfterProcessingTime()));
                    this.morphium.set(msg, this.getCollectionName(), Msg.Fields.deleteAt, msg.getDeleteAt());
                    if (msg.isExclusive()) {
                        this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName()).f("_id").eq(msg.getMsgId()).set(MsgLock.Fields.deleteAt, (Object)msg.getDeleteAt());
                    }
                }
            }
        };
        this.queueOrRun(r);
    }

    private void unlockIfExclusive(Msg msg) {
        if (msg.isExclusive()) {
            this.deleteLock(msg.getMsgId());
        }
    }

    private void deleteLock(MorphiumId msgId) {
        this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getLockCollectionName()).f("_id").eq(msgId).f("lock_id").eq(this.id).remove();
    }

    private void processMessages(List<MorphiumId> messages) {
        for (MorphiumId mId : messages) {
            if (!this.running) {
                return;
            }
            this.processing.add(mId);
            try {
                FindCommand cmd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(null));
                cmd.setFilter(Doc.of("_id", mId));
                cmd.setLimit(1);
                cmd.setBatchSize(1);
                cmd.setColl(this.getCollectionName());
                cmd.setDb(this.morphium.getDatabase());
                List<Map<String, Object>> result = cmd.execute();
                cmd.releaseConnection();
                if (result.isEmpty()) {
                    this.processing.remove(mId);
                    continue;
                }
                Msg msg = this.morphium.getMapper().deserialize(Msg.class, result.get(0));
                if (msg == null) continue;
                if (msg.getInAnswerTo() != null) {
                    this.handleAnswer(msg);
                    continue;
                }
                this.processMessage(msg);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void removeProcessingFor(Msg msg) {
        RemoveProcessTask rb = new RemoveProcessTask(this.processing, msg.getMsgId());
        long timeout = msg.getTtl() / 2L;
        if (msg.getTtl() == 0L || !msg.isTimingOut()) {
            timeout = 1000L;
        }
        if (msg.isDeleteAfterProcessing()) {
            if (msg.getDeleteAfterProcessingTime() == 0) {
                timeout = 1000L;
            } else if ((long)(msg.getDeleteAfterProcessingTime() / 2) < timeout) {
                timeout = msg.getDeleteAfterProcessingTime() / 2;
            }
        }
        while (true) {
            try {
                if (this.decouplePool.isTerminated() || this.decouplePool.isTerminating() || this.decouplePool.isShutdown()) break;
                this.decouplePool.schedule(() -> {
                    rb.run();
                    this.skipped.incrementAndGet();
                }, timeout, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException ex) {
                try {
                    Thread.sleep(this.pause);
                }
                catch (InterruptedException interruptedException) {}
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateProcessedBy(Msg msg) {
        if (msg == null) {
            return;
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return;
        }
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        msg.getProcessedBy().add(this.id);
        Map<String, Object> qobj = idq.toQueryObject();
        Doc set = Doc.of("processed_by", this.id);
        Doc update = Doc.of("$addToSet", set);
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, false, null, null, null);
            Map<String, Object> ret = ((WriteMongoCommand)cmd).execute();
            cmd.releaseConnection();
            cmd = null;
            if ((ret.get("nModified") == null && ret.get("modified") == null || Integer.valueOf(0).equals(ret.get("nModified"))) && this.morphium.reread(msg, this.getCollectionName()) != null && !msg.getProcessedBy().contains(this.id)) {
                log.warn(this.id + ": Could not update processed_by in msg " + String.valueOf(msg.getMsgId()));
                log.warn(this.id + ": " + Utils.toJsonString(ret));
                log.warn(this.id + ": msg: " + msg.toString());
            }
        }
        catch (MorphiumDriverException e) {
            log.error("Error updating processed by - this might lead to duplicate execution!", (Throwable)e);
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            boolean queued = false;
            while (!queued) {
                try {
                    while (this.threadPool.getActiveCount() > this.windowSize) {
                        Thread.sleep(this.morphium.getConfig().getIdleSleepTime());
                    }
                    this.threadPool.execute(r);
                    queued = true;
                }
                catch (Throwable throwable) {}
            }
        } else {
            r.run();
        }
    }

    public String getCollectionName() {
        if (this.collectionName == null) {
            this.collectionName = this.queueName == null || this.queueName.isEmpty() ? "msg" : "mmsg_" + this.queueName;
        }
        return this.collectionName;
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        if (this.listenerByName.get(n).contains(l)) {
            log.error("cowardly refusing to add already registered listener for name " + n);
        } else {
            this.listenerByName.get(n).add(l);
        }
        this.skipped.incrementAndGet();
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        if (((List)c.get(n)).isEmpty()) {
            c.remove(n);
        }
        this.listenerByName = c;
    }

    public String getSenderId() {
        return this.id;
    }

    public Messaging setSenderId(String id) {
        this.id = id;
        return this;
    }

    public int getPause() {
        return this.pause;
    }

    public Messaging setPause(int pause) {
        this.pause = pause;
        return this;
    }

    public boolean isRunning() {
        if (this.useChangeStream) {
            return this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning();
        }
        return this.running;
    }

    public void terminate() {
        int sz;
        this.running = false;
        this.listenerByName.clear();
        this.listeners.clear();
        this.waitingForAnswers.clear();
        this.processing.clear();
        this.skipped.set(0);
        if (this.decouplePool != null) {
            try {
                sz = this.decouplePool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still scheduled");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down decouple-pool", (Throwable)e);
            }
        }
        this.morphium.removeShutdownListener(this);
        if (this.threadPool != null) {
            try {
                sz = this.threadPool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still pending in pool");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down threadpool");
            }
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.terminate();
        }
        if (this.isAlive()) {
            try {
                this.interrupt();
            }
            catch (Exception e) {
                log.warn("Exception when interrupint messaging thread", (Throwable)e);
            }
        }
        int retry = 0;
        while (this.isAlive()) {
            try {
                Messaging.sleep(150L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (++retry > this.morphium.getConfig().getMaxWaitTime() / 150) {
                log.warn("Force stopping messaging!");
                this.stop();
            }
            if (retry <= 2 * this.morphium.getConfig().getMaxWaitTime() / 150) continue;
            throw new RuntimeException("Could not terminate Messaging!");
        }
    }

    public void addMessageListener(MessageListener l) {
        if (this.listeners.contains(l)) {
            log.error("Cowardly refusing to add already registered listener");
        } else {
            this.listeners.add(l);
        }
        this.skipped.incrementAndGet();
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        if (this.morphium.getDriver().equals("SingleMongoConnectDriver")) {
            this.storeMsg(m, false);
        } else {
            this.storeMsg(m, true);
        }
    }

    @Override
    public synchronized void start() {
        super.start();
    }

    public void sendMessage(Msg m) {
        this.storeMsg(m, false);
    }

    public long getNumberOfMessages() {
        return this.getPendingMessagesCount();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    public void sendMessageToSelf(Msg m) {
        this.sendMessageToSelf(m, false);
    }

    public void queueMessagetoSelf(Msg m) {
        this.sendMessageToSelf(m, true);
    }

    private void sendMessageToSelf(Msg m, boolean async) {
        AsyncCallbackAdapter cb = null;
        if (async) {
            cb = new AsyncCallbackAdapter();
        }
        m.setSender("self");
        m.addRecipient(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public Messaging setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
        return this;
    }

    @Override
    public void onShutdown(Morphium m) {
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdown();
                Thread.sleep(200L);
                if (this.threadPool != null) {
                    this.threadPool.shutdownNow();
                }
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.terminate();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) {
        return this.sendAndAwaitFirstAnswer(theMessage, timeoutInMs, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs, boolean throwExceptionOnTimeout) {
        if (!this.running) {
            throw new SystemShutdownException("Messaging shutting down - abort sending!");
        }
        theMessage.setMsgId(new MorphiumId());
        MorphiumId requestMsgId = theMessage.getMsgId();
        LinkedBlockingDeque blockingQueue = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, blockingQueue);
        try {
            this.sendMessage(theMessage);
            Msg firstAnswer = (Msg)blockingQueue.poll(timeoutInMs, TimeUnit.MILLISECONDS);
            if (null == firstAnswer && throwExceptionOnTimeout) {
                throw new MessageTimeoutException("Did not receive answer for message " + theMessage.getName() + "/" + String.valueOf(requestMsgId) + " in time (" + timeoutInMs + "ms)");
            }
            Msg msg = firstAnswer;
            return (T)msg;
        }
        catch (InterruptedException e) {
            log.error("Did not receive answer for message " + theMessage.getName() + "/" + String.valueOf(requestMsgId) + " interrupted.", (Throwable)e);
        }
        finally {
            this.waitingForAnswers.remove(requestMsgId);
        }
        return null;
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout) {
        return this.sendAndAwaitAnswers(theMessage, numberOfAnswers, timeout, false);
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout, boolean throwExceptionOnTimeout) {
        MorphiumId requestMsgId = theMessage.getMsgId();
        if (requestMsgId == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        LinkedBlockingDeque answerList = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, answerList);
        this.sendMessage(theMessage);
        long start = System.currentTimeMillis();
        while (this.running && (answerList.size() <= 0 || numberOfAnswers <= 0 || answerList.size() < numberOfAnswers)) {
            if (throwExceptionOnTimeout && System.currentTimeMillis() - start > timeout && answerList.isEmpty()) {
                throw new MessageTimeoutException("Did not receive any answer for message " + theMessage.getName() + "/" + String.valueOf(requestMsgId) + "in time (" + timeout + ")");
            }
            if (System.currentTimeMillis() - start > timeout) break;
            if (!this.running) {
                throw new SystemShutdownException("Messaging shutting down - abort waiting!");
            }
            try {
                Thread.sleep(this.morphium.getConfig().getIdleSleepTime());
            }
            catch (InterruptedException interruptedException) {}
        }
        return new ArrayList(this.waitingForAnswers.remove(requestMsgId));
    }

    public boolean isProcessMultiple() {
        return this.processMultiple;
    }

    public Messaging setProcessMultiple(boolean processMultiple) {
        this.processMultiple = processMultiple;
        return this;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public Messaging setQueueName(String queueName) {
        this.queueName = queueName;
        this.collectionName = null;
        this.lockCollectionName = null;
        return this;
    }

    public boolean isMultithreadded() {
        return this.multithreadded;
    }

    public Messaging setMultithreadded(boolean multithreadded) {
        if (!multithreadded && this.threadPool != null) {
            this.threadPool.shutdownNow();
            this.threadPool = null;
        } else if (multithreadded && this.threadPool == null) {
            this.initThreadPool();
        }
        this.multithreadded = multithreadded;
        return this;
    }

    public int getWindowSize() {
        return this.windowSize;
    }

    public Messaging setWindowSize(int windowSize) {
        this.windowSize = windowSize;
        return this;
    }

    public boolean isUseChangeStream() {
        return this.useChangeStream;
    }

    public int getRunningTasks() {
        if (this.threadPool != null) {
            return this.threadPool.getActiveCount();
        }
        return 0;
    }

    Morphium getMorphium() {
        return this.morphium;
    }

    public Messaging setPolling(boolean doPolling) {
        this.useChangeStream = !doPolling;
        return this;
    }

    public Messaging setUseChangeStream(boolean useChangeStream) {
        this.useChangeStream = useChangeStream;
        return this;
    }

    public static class SystemShutdownException
    extends RuntimeException {
        public SystemShutdownException(String msg) {
            super(msg);
        }
    }

    public static class MessageTimeoutException
    extends RuntimeException {
        public MessageTimeoutException(String msg) {
            super(msg);
        }
    }
}

