/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.StatisticKeys;
import de.caluga.morphium.StatisticValue;
import de.caluga.morphium.UtilsMap;
import de.caluga.morphium.async.AsyncCallbackAdapter;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.MorphiumDriver;
import de.caluga.morphium.driver.MorphiumDriverException;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.driver.commands.CountMongoCommand;
import de.caluga.morphium.driver.commands.FindCommand;
import de.caluga.morphium.driver.commands.MongoCommand;
import de.caluga.morphium.driver.commands.ReadMongoCommand;
import de.caluga.morphium.driver.commands.UpdateMongoCommand;
import de.caluga.morphium.driver.commands.WriteMongoCommand;
import de.caluga.morphium.driver.wire.ConnectionType;
import de.caluga.morphium.driver.wire.SingleMongoConnectDriver;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.RemoveProcessTask;
import de.caluga.morphium.messaging.StatusInfoListener;
import de.caluga.morphium.query.Query;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.bson.BsonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Messaging
extends Thread
implements ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(Messaging.class);
    private final StatusInfoListener statusInfoListener;
    private String statusInfoListenerName = "morphium.status_info";
    private boolean statusInfoListenerEnabled = true;
    private final Morphium morphium;
    private boolean running;
    private int pause;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private boolean processMultiple;
    private ReceiveAnswers receiveAnswers = ReceiveAnswers.ONLY_MINE;
    private final List<MessageListener> listeners;
    private final Map<String, Long> pauseMessages = new ConcurrentHashMap<String, Long>();
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private ThreadPoolExecutor threadPool;
    private final ScheduledThreadPoolExecutor decouplePool;
    private boolean multithreadded;
    private int windowSize;
    private boolean useChangeStream;
    private ChangeStreamMonitor changeStreamMonitor;
    private final Map<MorphiumId, List<Msg>> waitingForAnswers = new ConcurrentHashMap<MorphiumId, List<Msg>>();
    private final Map<MorphiumId, Msg> waitingForMessages = new ConcurrentHashMap<MorphiumId, Msg>();
    private final List<MorphiumId> processing = new CopyOnWriteArrayList<MorphiumId>();
    private final AtomicInteger skipped = new AtomicInteger(0);
    private MorphiumDriver watchConnection;

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
    }

    public Messaging(Morphium m) {
        this(m, null, 500, false, false, 100);
    }

    public Messaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, processMultiple, multithreadded, windowSize);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, processMultiple, false, m.getConfig().getMessagingWindowSize());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, m.isReplicaSet());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, useChangeStream, ReceiveAnswers.ONLY_MINE);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useChangeStream, ReceiveAnswers recieveAnswers) {
        this.setWindowSize(windowSize);
        this.setUseChangeStream(useChangeStream);
        this.setReceiveAnswers(recieveAnswers);
        this.setQueueName(queueName);
        this.setPause(pause);
        this.setProcessMultiple(processMultiple);
        this.morphium = m;
        this.statusInfoListener = new StatusInfoListener();
        this.setMultithreadded(multithreadded);
        this.decouplePool = new ScheduledThreadPoolExecutor(1);
        this.decouplePool.setThreadFactory(new ThreadFactory(){
            private final AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "decouple_thr_" + String.valueOf(this.num));
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
        this.morphium.addShutdownListener(this);
        this.running = true;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        m.ensureIndicesFor(Msg.class, this.getCollectionName());
        this.listeners = new CopyOnWriteArrayList<MessageListener>();
        this.listenerByName = new HashMap<String, List<MessageListener>>();
        if (m.getDriver().getName().equals("InMemDriver")) {
            this.watchConnection = m.getDriver();
        } else {
            this.watchConnection = new SingleMongoConnectDriver().setConnectionType(ConnectionType.PRIMARY);
            this.watchConnection.setHostSeed(this.morphium.getConfig().getHostSeed());
            this.watchConnection.setMaxWaitTime(this.morphium.getConfig().getMaxWaitTime());
            this.watchConnection.setDefaultBatchSize(this.morphium.getConfig().getCursorBatchSize());
            try {
                this.watchConnection.connect();
            }
            catch (MorphiumDriverException e) {
                log.error("Could not connect", (Throwable)e);
            }
        }
    }

    public void enableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(true);
    }

    public void disableStatusInfoListener() {
        this.setStatusInfoListenerEnabled(false);
    }

    public String getStatusInfoListenerName() {
        return this.statusInfoListenerName;
    }

    public void setStatusInfoListenerName(String statusInfoListenerName) {
        this.listenerByName.remove(this.statusInfoListenerName);
        this.statusInfoListenerName = statusInfoListenerName;
        this.listenerByName.put(statusInfoListenerName, Arrays.asList(this.statusInfoListener));
    }

    public boolean isStatusInfoListenerEnabled() {
        return this.statusInfoListenerEnabled;
    }

    public void setStatusInfoListenerEnabled(boolean statusInfoListenerEnabled) {
        this.statusInfoListenerEnabled = statusInfoListenerEnabled;
        if (statusInfoListenerEnabled && !this.listenerByName.containsKey(this.statusInfoListenerName)) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        } else if (!statusInfoListenerEnabled) {
            this.listenerByName.remove(this.statusInfoListenerName);
        }
    }

    public Map<String, List<String>> getListenerNames() {
        HashMap<String, List<String>> ret = new HashMap<String, List<String>>();
        HashMap<String, List<MessageListener>> localCopy = new HashMap<String, List<MessageListener>>(this.listenerByName);
        for (Map.Entry e : localCopy.entrySet()) {
            ArrayList<String> classes = new ArrayList<String>();
            for (MessageListener lst : (List)e.getValue()) {
                classes.add(lst.getClass().getName());
            }
            ret.put((String)e.getKey(), classes);
        }
        return ret;
    }

    public List<String> getGlobalListeners() {
        ArrayList<MessageListener> localCopy = new ArrayList<MessageListener>(this.listeners);
        ArrayList<String> ret = new ArrayList<String>();
        for (MessageListener lst : localCopy) {
            ret.add(lst.getClass().getName());
        }
        return ret;
    }

    public Map<String, Long> getThreadPoolStats() {
        String prefix = "messaging.threadpool.";
        return UtilsMap.of(prefix + "largest_poolsize", Long.valueOf(this.threadPool.getLargestPoolSize())).add((CallSite)((Object)(prefix + "task_count")), this.threadPool.getTaskCount()).add((CallSite)((Object)(prefix + "core_size")), Long.valueOf(this.threadPool.getCorePoolSize())).add((CallSite)((Object)(prefix + "maximum_pool_size")), Long.valueOf(this.threadPool.getMaximumPoolSize())).add((CallSite)((Object)(prefix + "pool_size")), Long.valueOf(this.threadPool.getPoolSize())).add((CallSite)((Object)(prefix + "active_count")), Long.valueOf(this.threadPool.getActiveCount())).add((CallSite)((Object)(prefix + "completed_task_count")), this.threadPool.getCompletedTaskCount());
    }

    private void initThreadPool() {
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(){

            @Override
            public boolean offer(Runnable e) {
                int maximumPoolSize;
                int poolSize = Messaging.this.threadPool.getPoolSize();
                if (poolSize >= (maximumPoolSize = Messaging.this.threadPool.getMaximumPoolSize()) || poolSize > Messaging.this.threadPool.getActiveCount()) {
                    return super.offer(e);
                }
                return false;
            }
        };
        this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue);
        this.threadPool.setRejectedExecutionHandler((r, executor) -> {
            try {
                executor.getQueue().put(r);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        this.threadPool.setThreadFactory(new ThreadFactory(){
            private final AtomicInteger num = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "messaging " + String.valueOf(this.num));
                this.num.set(this.num.get() + 1);
                ret.setDaemon(true);
                return ret;
            }
        });
    }

    public long getPendingMessagesCount() {
        Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        Query<Msg> or1 = q1.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).in(Arrays.asList(null, this.id)).f((Enum)Msg.Fields.processedBy).eq(null);
        Query<Msg> or2 = q1.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id);
        q1.or(or1, or2);
        return q1.countAll();
    }

    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.setName("Msg " + this.id);
        if (this.statusInfoListenerEnabled) {
            this.listenerByName.put(this.statusInfoListenerName, Arrays.asList(this.statusInfoListener));
        }
        if (this.useChangeStream) {
            try {
                this.findAndProcessPendingMessages(null);
                if (this.multithreadded) {
                    while (this.threadPool != null && this.threadPool.getActiveCount() > 0) {
                        Thread.yield();
                    }
                }
            }
            catch (Exception e) {
                log.error("Error processing existing messages in queue", (Throwable)e);
            }
            ArrayList<Map<String, Object>> pipeline = new ArrayList<Map<String, Object>>();
            LinkedHashMap match = new LinkedHashMap();
            LinkedHashMap<String, List<String>> in = new LinkedHashMap<String, List<String>>();
            in.put("$in", Arrays.asList("insert", "update"));
            match.put("operationType", in);
            pipeline.add(UtilsMap.of("$match", match));
            this.changeStreamMonitor = new ChangeStreamMonitor(this.morphium, this.getCollectionName(), true, this.pause, pipeline);
            this.changeStreamMonitor.addListener(evt -> {
                if (!this.running) {
                    return false;
                }
                try {
                    if (evt == null || evt.getOperationType() == null) {
                        return this.running;
                    }
                    if (evt.getOperationType().equals("insert")) {
                        Msg obj = this.morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
                        if (obj.getRecipients() != null && !obj.getRecipients().contains(this.getSenderId())) {
                            return this.running;
                        }
                        if (obj.getSender().equals(this.id)) {
                            return this.running;
                        }
                        if (obj.getInAnswerTo() != null) {
                            this.handleAnswer(obj);
                            return this.running;
                        }
                        if (this.listenerByName.get(obj.getName()) == null && this.listeners.size() == 0) {
                            return this.running;
                        }
                        if (this.pauseMessages.containsKey(obj.getName())) {
                            this.skipped.incrementAndGet();
                            return this.running;
                        }
                        if (obj.getSender().equals(this.id) || obj.getProcessedBy().contains(this.id) || obj.getRecipients() != null && !obj.getRecipients().contains(this.id)) {
                            return this.running;
                        }
                        if (obj.isExclusive() && obj.getLockedBy() == null && (obj.getRecipients() == null || obj.getRecipients().contains(this.id)) && obj.getProcessedBy().size() == 0) {
                            this.lockAndProcess(obj);
                        } else if (!obj.isExclusive() || obj.getRecipients() != null && obj.getRecipients().contains(this.id)) {
                            if (this.processing.contains(obj.getMsgId())) {
                                return this.running;
                            }
                            try {
                                this.processMessage(obj);
                            }
                            catch (Exception e) {
                                log.error("Error during message processing ", (Throwable)e);
                            }
                        }
                    } else if (evt.getOperationType().equals("update")) {
                        if (evt.getUpdatedFields() != null && evt.getUpdatedFields().containsKey("locked_by") && !(evt.getUpdatedFields().get("locked_by") instanceof BsonNull)) {
                            return this.running;
                        }
                        if (evt.getUpdatedFields() != null && evt.getUpdatedFields().containsKey("processed_by")) {
                            return this.running;
                        }
                        Msg obj = this.morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
                        if (obj == null) {
                            return this.running;
                        }
                        if (obj.getProcessedBy().contains(this.id)) {
                            return this.running;
                        }
                        if (obj.getSender().equals(this.id) || obj.getRecipients() != null && !obj.getRecipients().contains(this.id)) {
                            return this.running;
                        }
                        if (obj.getInAnswerTo() != null) {
                            this.handleAnswer(obj);
                            return this.running;
                        }
                        if (this.listenerByName.get(obj.getName()) == null && this.listeners.size() == 0) {
                            return this.running;
                        }
                        if (this.pauseMessages.containsKey(obj.getName())) {
                            return this.running;
                        }
                        if (obj != null && obj.isExclusive() && (obj.getLockedBy() == null || obj.getLockedBy().equals(this.id)) && (obj.getRecipients() == null || obj.getRecipients().contains(this.id))) {
                            this.lockAndProcess(obj);
                        } else if (!obj.isExclusive() && !obj.getProcessedBy().contains(this.id)) {
                            this.processMessage(obj);
                        }
                    }
                }
                catch (Exception e) {
                    log.error("Error during event processing in changestream", (Throwable)e);
                }
                return this.running;
            });
            this.changeStreamMonitor.start();
        }
        this.findAndProcessMessages(this.processMultiple);
        while (this.running) {
            try {
                if (this.skipped.get() > 0 || !this.useChangeStream) {
                    this.morphium.inc(StatisticKeys.PULL);
                    StatisticValue sk = this.morphium.getStats().get((Object)StatisticKeys.PULLSKIP);
                    sk.set(sk.get() + (long)this.skipped.get());
                    this.skipped.set(0);
                    this.findAndProcessMessages(this.processMultiple);
                    continue;
                }
                this.morphium.inc(StatisticKeys.SKIPPED_MSG_UPDATES);
            }
            catch (Throwable e) {
                if (!this.running) break;
                log.error("Unhandled exception " + e.getMessage(), e);
            }
            finally {
                try {
                    Messaging.sleep(this.pause);
                }
                catch (InterruptedException sk) {}
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " stopped!");
        }
        if (!this.running) {
            this.listeners.clear();
            this.listenerByName.clear();
        }
    }

    private void handleAnswer(Msg obj) {
        if (this.waitingForMessages.containsKey(obj.getInAnswerTo())) {
            this.updateProcessedBy(obj);
            if (!this.waitingForAnswers.get(obj.getInAnswerTo()).contains(obj)) {
                this.waitingForAnswers.get(obj.getInAnswerTo()).add(obj);
            }
        }
        if (!this.receiveAnswers.equals((Object)ReceiveAnswers.NONE) && (this.receiveAnswers.equals((Object)ReceiveAnswers.ALL) || obj.getRecipients() != null && obj.getRecipients().contains(this.id))) {
            try {
                if (obj.isExclusive()) {
                    this.lockAndProcess(obj);
                } else {
                    this.processMessage(obj);
                }
            }
            catch (Exception e) {
                log.error("Error during message processing ", (Throwable)e);
            }
        }
    }

    public void pauseProcessingOfMessagesNamed(String name) {
        this.pauseMessages.putIfAbsent(name, System.currentTimeMillis());
    }

    public Long unpauseProcessingOfMessagesNamed(String name) {
        if (!this.pauseMessages.containsKey(name)) {
            return 0L;
        }
        Long ret = this.pauseMessages.remove(name);
        if (ret != null) {
            ret = System.currentTimeMillis() - ret;
        }
        this.skipped.incrementAndGet();
        return ret;
    }

    public void findAndProcessPendingMessages(String name) {
        Runnable r = () -> {
            List<MorphiumId> messages;
            while ((messages = this.lockAndGetMessages(name, this.processMultiple)) != null && messages.size() != 0) {
                this.processMessages(messages);
                try {
                    Thread.sleep(this.pause);
                }
                catch (InterruptedException interruptedException) {}
            }
        };
        this.queueOrRun(r);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<MorphiumId> lockAndGetMessages(String name, boolean multiple) {
        ArrayList<MorphiumId> arrayList;
        if (!this.running) {
            return new ArrayList<MorphiumId>();
        }
        HashMap<String, Object> values = new HashMap<String, Object>();
        Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        if (this.listenerByName.isEmpty() && this.listeners.isEmpty()) {
            return q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.inAnswerTo).in(this.waitingForMessages.keySet()).idList();
        }
        if (!this.useChangeStream) {
            q.f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(null).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id));
        } else {
            q.f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).in(Arrays.asList(this.id, null, "ALL")).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id));
        }
        Set<String> pausedMessagesKeys = this.pauseMessages.keySet();
        if (name != null) {
            if (pausedMessagesKeys.contains(name)) {
                log.error("Cannot lock messages, if message type is paused");
                return new ArrayList<MorphiumId>();
            }
            q.f((Enum)Msg.Fields.name).eq(name);
        } else {
            if (!this.pauseMessages.isEmpty()) {
                q.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
            }
            if (this.listeners.isEmpty() && !this.listenerByName.isEmpty() && !this.listenerByName.keySet().isEmpty()) {
                q.f((Enum)Msg.Fields.name).in(this.listenerByName.keySet());
            }
        }
        ArrayList<MorphiumId> processingIds = new ArrayList<MorphiumId>(this.processing);
        if (!this.processing.isEmpty()) {
            q.f("_id").nin(processingIds);
        }
        q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
        int locked = (int)this.morphium.createQueryFor(Msg.class, this.getCollectionName()).f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).countAll();
        if (!multiple) {
            q.limit(1);
            if (locked > 1) {
                this.skipped.incrementAndGet();
            }
        } else {
            if (locked >= this.windowSize) {
                return new ArrayList<MorphiumId>();
            }
            q.limit(this.windowSize - locked);
        }
        if (!this.useChangeStream) {
            values.put("locked_by", this.id);
        }
        values.put("locked", System.currentTimeMillis());
        MongoCommand fnd = null;
        List lst = null;
        try {
            fnd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(Msg.class)));
            fnd.setDb(this.morphium.getDatabase());
            ((FindCommand)fnd).setFilter(q.toQueryObject());
            ((FindCommand)fnd).setProjection(Doc.of("_id", (Object)1));
            ((FindCommand)fnd).setLimit(q.getLimit());
            ((FindCommand)fnd).setBatchSize(q.getBatchSize());
            ((FindCommand)fnd).setSort(q.getSort());
            ((FindCommand)fnd).setSkip(q.getSkip());
            fnd.setColl(this.getCollectionName());
            List<Map<String, Object>> result = ((ReadMongoCommand)fnd).execute();
            lst = result.stream().map(e -> e.get("_id")).collect(Collectors.toList());
            if (locked > lst.size()) {
                this.skipped.incrementAndGet();
            } else {
                CountMongoCommand cnt = new CountMongoCommand(fnd.getConnection());
                cnt.setDb(this.morphium.getDatabase());
                cnt.setColl(this.getCollectionName());
                cnt.setQuery(q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).in(Arrays.asList("ALL", this.id, null)).f((Enum)Msg.Fields.processedBy).ne(this.id).toQueryObject());
                if (cnt.getCount() > 0) {
                    this.skipped.incrementAndGet();
                }
            }
            if (!lst.isEmpty()) {
                UpdateMongoCommand cmd = null;
                HashMap toSet = new HashMap();
                for (Map.Entry ef : values.entrySet()) {
                    String fieldName = this.morphium.getARHelper().getMongoFieldName(q.getType(), (String)ef.getKey());
                    toSet.put(fieldName, ef.getValue());
                }
                cmd = new UpdateMongoCommand(fnd.getConnection());
                ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                cmd.addUpdate(q.q().f("_id").in(lst).toQueryObject(), Doc.of("$set", toSet), null, false, multiple, null, null, null);
                cmd.execute();
            }
            if (!this.useChangeStream) {
                long num = 0L;
                Query<Msg> cntQuery = q.q().f((Enum)Msg.Fields.lockedBy).eq(this.id);
                MongoCommand cnt = null;
                try {
                    cnt = new CountMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(Msg.class)));
                    ((CountMongoCommand)cnt.setDb(this.morphium.getDatabase())).setColl(this.getCollectionName());
                    ((CountMongoCommand)cnt).setQuery(cntQuery.toQueryObject());
                    while (true) {
                        try {
                            Thread.sleep(150L);
                        }
                        catch (InterruptedException fieldName) {
                            // empty catch block
                        }
                        if (cntQuery.countAll() == num) {
                            break;
                        }
                        num = ((CountMongoCommand)cnt).getCount();
                    }
                }
                catch (Exception e2) {
                    log.error("Error waiting for concurrent messaging systems");
                }
                finally {
                    if (cnt != null) {
                        cnt.releaseConnection();
                    }
                }
                q = q.q();
                Query<Msg> q1 = q.q().f((Enum)Msg.Fields.sender).ne(this.id);
                if (!processingIds.isEmpty()) {
                    q1.f("_id").nin(processingIds);
                }
                q1.f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id));
                if (name != null) {
                    q1.f((Enum)Msg.Fields.name).eq(name);
                } else if (!this.pauseMessages.isEmpty()) {
                    q1.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
                }
                Query<Msg> q2 = q.q().f((Enum)Msg.Fields.sender).ne(this.id);
                if (!processingIds.isEmpty()) {
                    q2.f("_id").nin(processingIds);
                }
                q2.f((Enum)Msg.Fields.lockedBy).eq(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipients).in(Arrays.asList(null, this.id));
                if (name != null) {
                    q2.f((Enum)Msg.Fields.name).eq(name);
                } else if (!this.pauseMessages.isEmpty()) {
                    q2.f((Enum)Msg.Fields.name).nin(pausedMessagesKeys);
                }
                q.or(q1, q2);
                q.sort(Msg.Fields.priority, Msg.Fields.timestamp);
                if (!multiple) {
                    q.limit(1);
                } else {
                    q.limit(this.windowSize);
                }
                fnd.clear();
                ((FindCommand)fnd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                ((FindCommand)fnd).setFilter(q.toQueryObject());
                ((FindCommand)fnd).setSort(q.getSort());
                ((FindCommand)fnd).setLimit(q.getLimit());
                ((FindCommand)fnd).setProjection(Doc.of("_id", (Object)1));
                List<MorphiumId> idList = null;
                List<Map<String, Object>> res = ((ReadMongoCommand)fnd).execute();
                if (res == null || res.size() == 0) {
                    List<MorphiumId> list = null;
                    return list;
                }
                List<MorphiumId> list = idList = res.stream().map(m -> (MorphiumId)m.get("_id")).collect(Collectors.toList());
                return list;
            }
            arrayList = new ArrayList<MorphiumId>();
            return arrayList;
        }
        catch (Exception e3) {
            log.error("Error while processing", (Throwable)e3);
            arrayList = null;
            return arrayList;
        }
        finally {
            if (fnd != null) {
                fnd.releaseConnection();
            }
        }
    }

    private void findAndProcessMessages(boolean multiple) {
        if (!this.running) {
            return;
        }
        List<MorphiumId> messages = this.lockAndGetMessages(null, multiple);
        if (messages == null) {
            return;
        }
        if (messages.size() == 0) {
            return;
        }
        this.processMessages(messages);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void lockAndProcess(Msg obj) {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        q.f((Enum)Msg.Fields.sender).ne(this.id);
        q.f((Enum)Msg.Fields.lockedBy).eq(this.id).f((Enum)Msg.Fields.processedBy).ne(this.id);
        if (this.processMultiple && q.countAll() >= (long)this.windowSize) {
            this.skipped.incrementAndGet();
            return;
        }
        q = q.q();
        q.f("_id").eq(obj.getMsgId());
        q.f((Enum)Msg.Fields.processedBy).ne(this.id);
        q.f((Enum)Msg.Fields.lockedBy).in(Arrays.asList(this.id, null));
        HashMap<String, Object> values = new HashMap<String, Object>();
        values.put("locked_by", this.id);
        values.put("locked", System.currentTimeMillis());
        Doc update = Doc.of("$set", values);
        Map<String, Object> qobj = q.toQueryObject();
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, this.processMultiple, null, null, null);
            Map<String, Object> result = ((WriteMongoCommand)cmd).execute();
            if (result.get("nModified") != null && result.get("nModified").equals(1)) {
                obj.setLocked((Long)values.get("locked"));
                obj.setLockedBy((String)values.get("locked_by"));
                this.processMessage(obj);
            }
        }
        catch (Exception e) {
            log.error("Exception during update", (Throwable)e);
        }
        finally {
            if (cmd != null) {
                cmd.getConnection().release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void processMessage(Msg msg) {
        MongoCommand cmd;
        if (msg == null) {
            if (log.isDebugEnabled()) {
                log.debug("Message was deleted before processing could happen!");
            }
            return;
        }
        if (msg.isExclusive() && !this.getSenderId().equals(msg.getLockedBy())) {
            if (log.isDebugEnabled()) {
                log.debug("Not processing " + String.valueOf(msg.getMsgId()) + " - senderID does not match locked by");
            }
            this.processing.remove(msg.getMsgId());
            return;
        }
        if (msg.getSender().equals(this.getSenderId())) {
            if (log.isDebugEnabled()) {
                log.debug("Not processing - msg was sent by me");
            }
            this.processing.remove(msg.getMsgId());
            return;
        }
        if (msg.isExclusive() && !msg.getLockedBy().equals(null) && !msg.getLockedBy().equals(this.getSenderId())) {
            if (log.isDebugEnabled()) {
                log.debug("Not processing " + String.valueOf(msg.getMsgId()) + " - locked by someone else");
            }
            this.processing.remove(msg.getMsgId());
            return;
        }
        if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
            if (log.isDebugEnabled()) {
                log.debug(this.getSenderId() + ": Found outdated message - deleting it!");
            }
            this.morphium.delete(msg, this.getCollectionName());
            this.processing.remove(msg.getMsgId());
            return;
        }
        if (msg.getInAnswerTo() != null) {
            if (this.waitingForMessages.containsKey(msg.getInAnswerTo())) {
                this.updateProcessedBy(msg);
                List<Msg> lst = this.waitingForAnswers.get(msg.getInAnswerTo());
                if (lst != null && !lst.contains(msg)) {
                    this.waitingForAnswers.get(msg.getInAnswerTo()).add(msg);
                }
            }
            if (this.receiveAnswers.equals((Object)ReceiveAnswers.NONE) || this.receiveAnswers.equals((Object)ReceiveAnswers.ONLY_MINE) && msg.getRecipients() != null && !msg.getRecipients().contains(this.id)) {
                if (msg.isExclusive() && msg.getLockedBy() != null && msg.getLockedBy().equals(this.id)) {
                    cmd = null;
                    try {
                        cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
                        ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                        ((UpdateMongoCommand)cmd).addUpdate(Doc.of("_id", msg.getMsgId()), Doc.of("$set", Doc.of("locked_by", null)), null, false, false, null, null, null);
                        ((WriteMongoCommand)cmd).execute();
                    }
                    catch (MorphiumDriverException e) {
                        log.error("Error unlocking message", (Throwable)e);
                    }
                    finally {
                        if (cmd != null) {
                            cmd.getConnection().release();
                        }
                    }
                }
                this.removeProcessingFor(msg);
                return;
            }
        }
        if (this.listeners.isEmpty() && this.listenerByName.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Not further processing - no listener for non answer message");
            }
            this.updateProcessedBy(msg);
            if (msg.isExclusive()) {
                cmd = null;
                try {
                    cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
                    ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                    ((UpdateMongoCommand)cmd).addUpdate(Doc.of("_id", msg.getMsgId()), Doc.of("$set", Doc.of("locked_by", null)), null, false, false, null, null, null);
                    ((WriteMongoCommand)cmd).execute();
                }
                catch (MorphiumDriverException e) {
                    log.error("Error unlocking message", (Throwable)e);
                }
                finally {
                    if (cmd != null) {
                        cmd.getConnection().release();
                    }
                }
            }
            this.removeProcessingFor(msg);
            return;
        }
        if (this.processing.contains(msg.getMsgId())) {
            return;
        }
        this.processing.add(msg.getMsgId());
        Runnable r = () -> {
            Msg answer;
            Msg msg1;
            boolean wasProcessed = false;
            boolean wasRejected = false;
            ArrayList<MessageRejectedException> rejections = new ArrayList<MessageRejectedException>();
            ArrayList<MessageListener> lst = new ArrayList<MessageListener>(this.listeners);
            if (this.listenerByName.get(msg.getName()) != null) {
                lst.addAll((Collection<MessageListener>)this.listenerByName.get(msg.getName()));
            }
            if (lst.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug(this.getSenderId() + ": Message did not have a listener registered: " + msg.getName());
                }
                wasProcessed = true;
            }
            if ((msg1 = msg) == null) {
                log.debug("Message was deleted");
                this.removeProcessingFor(msg);
                return;
            }
            if (msg1.isExclusive() && msg1.getLockedBy() != null && !msg1.getLockedBy().equals(this.id) || msg1.getLockedBy() == null) {
                if (log.isDebugEnabled()) {
                    log.debug(String.valueOf(msg1.getMsgId()) + " was overlocked by " + msg1.getLockedBy());
                }
                this.removeProcessingFor(msg1);
                return;
            }
            for (MessageListener l : lst) {
                try {
                    if (this.pauseMessages.containsKey(msg1.getName())) {
                        this.processing.remove(msg1.getMsgId());
                        this.skipped.incrementAndGet();
                        return;
                    }
                    if (l.markAsProcessedBeforeExec()) {
                        this.updateProcessedBy(msg1);
                    }
                    answer = l.onMessage(this, msg1);
                    wasProcessed = true;
                    if (this.autoAnswer && answer == null) {
                        answer = new Msg(msg1.getName(), "received", "");
                    }
                    if (answer == null) continue;
                    msg1.sendAnswer(this, answer);
                    if (answer.getRecipients() != null) continue;
                    log.warn("Recipient of answer is null?!?!");
                }
                catch (MessageRejectedException mre) {
                    log.warn("Message was rejected by listener", (Throwable)mre);
                    wasRejected = true;
                    rejections.add(mre);
                }
                catch (Exception e) {
                    log.error("listener Processing failed", (Throwable)e);
                }
            }
            if (wasRejected) {
                for (MessageRejectedException mre : rejections) {
                    if (mre.isSendAnswer()) {
                        answer = new Msg(msg.getName(), "message rejected by listener", mre.getMessage());
                        msg.sendAnswer(this, answer);
                    }
                    if (!mre.isContinueProcessing()) continue;
                    this.updateProcessedBy(msg);
                    if (msg.isExclusive()) {
                        MongoCommand cmd = null;
                        try {
                            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
                            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                            ((UpdateMongoCommand)cmd).addUpdate(Doc.of("_id", msg.getMsgId()), Doc.of("$set", Doc.of("locked_by", null)), null, false, false, null, null, null);
                            ((WriteMongoCommand)cmd).execute();
                        }
                        catch (MorphiumDriverException e) {
                            log.error("Error unlocking message", (Throwable)e);
                        }
                        finally {
                            if (cmd != null) {
                                cmd.getConnection().release();
                            }
                        }
                    }
                    this.processing.remove(msg.getMsgId());
                    log.debug(this.id + ": Message will be re-processed by others");
                }
            }
            if (!wasProcessed && !wasRejected) {
                log.error("message was not processed");
                if (msg.isExclusive()) {
                    msg.setLocked(0L);
                    msg.setLockedBy(null);
                    MongoCommand cmd = null;
                    try {
                        cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
                        ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
                        ((UpdateMongoCommand)cmd).addUpdate(Doc.of("_id", msg.getMsgId()), Doc.of("$set", Doc.of("locked_by", null, "locked", (Object)0)), null, false, false, null, null, null);
                        ((WriteMongoCommand)cmd).execute();
                    }
                    catch (MorphiumDriverException e) {
                        log.error("Error unlocking message", (Throwable)e);
                    }
                    finally {
                        if (cmd != null) {
                            cmd.getConnection().release();
                        }
                    }
                }
            } else if (wasRejected) {
                log.debug("Message rejected");
            }
            if (wasProcessed) {
                this.updateProcessedBy(msg);
            }
            this.removeProcessingFor(msg);
        };
        this.queueOrRun(r);
    }

    private synchronized void processMessages(List<MorphiumId> messages) {
        for (MorphiumId mId : messages) {
            if (!this.running) {
                return;
            }
            try {
                FindCommand cmd = new FindCommand(this.morphium.getDriver().getPrimaryConnection(null));
                cmd.setFilter(Doc.of("_id", mId));
                cmd.setLimit(1);
                cmd.setBatchSize(1);
                cmd.setColl(this.getCollectionName());
                cmd.setDb(this.morphium.getDatabase());
                List<Map<String, Object>> result = cmd.execute();
                cmd.releaseConnection();
                if (result.isEmpty()) {
                    this.processing.remove(mId);
                    continue;
                }
                Msg msg = this.morphium.getMapper().deserialize(Msg.class, result.get(0));
                if (msg == null) continue;
                this.processMessage(msg);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void removeProcessingFor(Msg msg) {
        RemoveProcessTask rb = new RemoveProcessTask(this.processing, msg.getMsgId());
        while (true) {
            try {
                if (this.decouplePool.isTerminated() || this.decouplePool.isTerminating() || this.decouplePool.isShutdown()) break;
                this.decouplePool.schedule(rb, msg.getTtl(), TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException ex) {
                try {
                    Thread.sleep(this.pause);
                }
                catch (InterruptedException interruptedException) {}
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateProcessedBy(Msg msg) {
        if (msg == null) {
            return;
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return;
        }
        if ((msg = this.morphium.reread(msg, this.getCollectionName())) == null) {
            return;
        }
        if (msg.getProcessedBy() == null) {
            msg.setProcessedBy(new ArrayList<String>());
        }
        if (msg.getProcessedBy().contains(this.id)) {
            return;
        }
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class, this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        msg.getProcessedBy().add(this.id);
        Map<String, Object> qobj = idq.toQueryObject();
        String fieldName = this.morphium.getARHelper().getMongoFieldName(msg.getClass(), "processed_by");
        Doc set = Doc.of(fieldName, this.id);
        Doc update = Doc.of("$push", set);
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName())).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, false, null, null, null);
            Map<String, Object> ret = ((WriteMongoCommand)cmd).execute();
            if (ret.get("nModified") == null && ret.get("modified") == null || Integer.valueOf(0).equals(ret.get("nModified")) || Integer.valueOf(0).equals(ret.get("modified"))) {
                log.warn("Could not update processed_by in msg " + String.valueOf(msg.getMsgId()));
            }
        }
        catch (MorphiumDriverException e) {
            e.printStackTrace();
        }
        finally {
            if (cmd != null) {
                cmd.getConnection().release();
            }
        }
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            boolean queued = false;
            while (!queued) {
                try {
                    while (this.threadPool.getActiveCount() > this.windowSize) {
                        Thread.yield();
                    }
                    this.threadPool.execute(r);
                    queued = true;
                }
                catch (Throwable throwable) {}
            }
        } else {
            r.run();
        }
    }

    public String getCollectionName() {
        if (this.queueName == null || this.queueName.isEmpty()) {
            return "msg";
        }
        return "mmsg_" + this.queueName;
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        if (this.listenerByName.get(n).contains(l)) {
            log.error("cowardly refusing to add already registered listener for name " + n);
        } else {
            this.listenerByName.get(n).add(l);
        }
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        if (((List)c.get(n)).isEmpty()) {
            c.remove(n);
        }
        this.listenerByName = c;
    }

    public String getSenderId() {
        return this.id;
    }

    public Messaging setSenderId(String id) {
        this.id = id;
        return this;
    }

    public int getPause() {
        return this.pause;
    }

    public Messaging setPause(int pause) {
        this.pause = pause;
        return this;
    }

    public boolean isRunning() {
        if (this.useChangeStream) {
            return this.changeStreamMonitor != null && this.changeStreamMonitor.isRunning();
        }
        return this.running;
    }

    public void terminate() {
        this.running = false;
        this.listenerByName.clear();
        this.listeners.clear();
        this.waitingForMessages.clear();
        this.waitingForMessages.clear();
        this.processing.clear();
        this.skipped.set(0);
        if (!this.morphium.getDriver().getName().equals("InMemDriver")) {
            try {
                this.watchConnection.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        if (this.decouplePool != null) {
            try {
                int sz = this.decouplePool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still scheduled");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down decouple-pool", (Throwable)e);
            }
        }
        this.morphium.removeShutdownListener(this);
        if (this.threadPool != null) {
            try {
                int sz = this.threadPool.shutdownNow().size();
                if (log.isDebugEnabled()) {
                    log.debug("Shutting down with " + sz + " runnables still pending in pool");
                }
            }
            catch (Exception e) {
                log.warn("Exception when shutting down threadpool");
            }
        }
        if (this.changeStreamMonitor != null) {
            this.changeStreamMonitor.terminate();
        }
        if (this.isAlive()) {
            try {
                this.interrupt();
            }
            catch (Exception e) {
                log.warn("Exception when interrupint messaging thread", (Throwable)e);
            }
        }
        int retry = 0;
        while (this.isAlive()) {
            try {
                Messaging.sleep(150L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (++retry > this.morphium.getConfig().getMaxWaitTime() / 150) {
                log.warn("Force stopping messaging!");
                this.stop();
            }
            if (retry <= 2 * this.morphium.getConfig().getMaxWaitTime() / 150) continue;
            throw new RuntimeException("Could not terminate Messaging!");
        }
    }

    public void addMessageListener(MessageListener l) {
        if (this.listeners.contains(l)) {
            log.error("Cowardly refusing to add already registered listener");
        } else {
            this.listeners.add(l);
        }
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        this.storeMsg(m, true);
    }

    @Override
    public synchronized void start() {
        super.start();
        if (this.useChangeStream) {
            try {
                Thread.sleep(250L);
            }
            catch (Exception e) {
                log.error("error:" + e.getMessage());
            }
        }
    }

    public void sendMessage(Msg m) {
        this.storeMsg(m, false);
    }

    public long getNumberOfMessages() {
        return this.getPendingMessagesCount();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                    log.error("Error storing msg", t);
                }
            };
        }
        m.setSender(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    public void sendMessageToSelf(Msg m) {
        this.sendMessageToSelf(m, false);
    }

    public void queueMessagetoSelf(Msg m) {
        this.sendMessageToSelf(m, true);
    }

    private void sendMessageToSelf(Msg m, boolean async) {
        AsyncCallbackAdapter cb = null;
        if (async) {
            cb = new AsyncCallbackAdapter();
        }
        m.setSender("self");
        m.addRecipient(this.id);
        m.setSenderHost(this.hostname);
        this.morphium.insert(m, this.getCollectionName(), cb);
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public Messaging setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
        return this;
    }

    @Override
    public void onShutdown(Morphium m) {
        try {
            this.running = false;
            if (this.threadPool != null) {
                this.threadPool.shutdown();
                Thread.sleep(200L);
                if (this.threadPool != null) {
                    this.threadPool.shutdownNow();
                }
                this.threadPool = null;
            }
            if (this.changeStreamMonitor != null) {
                this.changeStreamMonitor.terminate();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) {
        return this.sendAndAwaitFirstAnswer(theMessage, timeoutInMs, true);
    }

    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs, boolean throwExceptionOnTimeout) {
        theMessage.setMsgId(new MorphiumId());
        this.waitingForAnswers.put(theMessage.getMsgId(), new ArrayList());
        this.waitingForMessages.put(theMessage.getMsgId(), theMessage);
        this.sendMessage(theMessage);
        long start = System.currentTimeMillis();
        while (this.waitingForAnswers.get(theMessage.getMsgId()).size() == 0) {
            if (!this.running) {
                throw new SystemShutdownException("Messaging shutting down - abort waiting!");
            }
            if (System.currentTimeMillis() - start > timeoutInMs) {
                log.error("Did not receive answer " + theMessage.getName() + "/" + String.valueOf(theMessage.getMsgId()) + " in time (" + timeoutInMs + "ms)");
                this.waitingForMessages.remove(theMessage.getMsgId());
                if (throwExceptionOnTimeout) {
                    throw new MessageTimeoutException("Did not receive answer for message " + theMessage.getName() + "/" + String.valueOf(theMessage.getMsgId()) + " in time (" + timeoutInMs + "ms)");
                }
                return null;
            }
            Thread.yield();
        }
        if (log.isDebugEnabled()) {
            log.debug("got message after: " + (System.currentTimeMillis() - start) + "ms");
        }
        this.waitingForMessages.remove(theMessage.getMsgId());
        return (T)this.waitingForAnswers.remove(theMessage.getMsgId()).get(0);
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout) {
        return this.sendAndAwaitAnswers(theMessage, numberOfAnswers, timeout, false);
    }

    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout, boolean throwExceptionOnTimeout) {
        if (theMessage.getMsgId() == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        this.waitingForAnswers.put(theMessage.getMsgId(), new ArrayList());
        this.waitingForMessages.put(theMessage.getMsgId(), theMessage);
        this.sendMessage(theMessage);
        long start = System.currentTimeMillis();
        while (!(!this.running || this.waitingForAnswers.get(theMessage.getMsgId()).size() > 0 && (numberOfAnswers > 0 && this.waitingForAnswers.get(theMessage.getMsgId()).size() >= numberOfAnswers || numberOfAnswers > 0 && this.waitingForAnswers.get(theMessage.getMsgId()).size() >= numberOfAnswers) || System.currentTimeMillis() - start > timeout)) {
            if (throwExceptionOnTimeout && System.currentTimeMillis() - start > timeout && this.waitingForAnswers.get(theMessage.getMsgId()).isEmpty()) {
                throw new MessageTimeoutException("Did not receive any answer for message " + theMessage.getName() + "/" + String.valueOf(theMessage.getMsgId()) + "in time (" + timeout + ")");
            }
            if (!this.running) {
                throw new SystemShutdownException("Messaging shutting down - abort waiting!");
            }
            Thread.yield();
        }
        this.waitingForMessages.remove(theMessage.getMsgId());
        return this.waitingForAnswers.remove(theMessage.getMsgId());
    }

    public boolean isProcessMultiple() {
        return this.processMultiple;
    }

    public Messaging setProcessMultiple(boolean processMultiple) {
        this.processMultiple = processMultiple;
        return this;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public Messaging setQueueName(String queueName) {
        this.queueName = queueName;
        return this;
    }

    public boolean isMultithreadded() {
        return this.multithreadded;
    }

    public Messaging setMultithreadded(boolean multithreadded) {
        if (!multithreadded && this.threadPool != null) {
            this.threadPool.shutdownNow();
            this.threadPool = null;
        } else if (multithreadded && this.threadPool == null) {
            this.initThreadPool();
        }
        this.multithreadded = multithreadded;
        return this;
    }

    public int getWindowSize() {
        return this.windowSize;
    }

    public Messaging setWindowSize(int windowSize) {
        this.windowSize = windowSize;
        return this;
    }

    public boolean isReceiveAnswers() {
        return !this.receiveAnswers.equals((Object)ReceiveAnswers.NONE);
    }

    public ReceiveAnswers getReceiveAnswers() {
        return this.receiveAnswers;
    }

    public void setReceiveAnswers(ReceiveAnswers receiveAnswers) {
        this.receiveAnswers = receiveAnswers;
    }

    public boolean isUseChangeStream() {
        return this.useChangeStream;
    }

    public int getRunningTasks() {
        if (this.threadPool != null) {
            return this.threadPool.getActiveCount();
        }
        return 0;
    }

    Morphium getMorphium() {
        return this.morphium;
    }

    public Messaging setUseChangeStream(boolean useChangeStream) {
        this.useChangeStream = useChangeStream;
        return this;
    }

    public static enum ReceiveAnswers {
        NONE,
        ONLY_MINE,
        ALL;

    }

    public static class SystemShutdownException
    extends RuntimeException {
        public SystemShutdownException(String msg) {
            super(msg);
        }
    }

    public static class MessageTimeoutException
    extends RuntimeException {
        public MessageTimeoutException(String msg) {
            super(msg);
        }
    }
}

