/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.UtilsMap;
import de.caluga.morphium.annotations.Messaging;
import de.caluga.morphium.async.AsyncCallbackAdapter;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.changestream.ChangeStreamMonitor;
import de.caluga.morphium.config.MessagingSettings;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.MorphiumDriverException;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.driver.commands.InsertMongoCommand;
import de.caluga.morphium.driver.commands.MongoCommand;
import de.caluga.morphium.driver.commands.UpdateMongoCommand;
import de.caluga.morphium.driver.commands.WriteMongoCommand;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.MorphiumMessaging;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.MsgLock;
import de.caluga.morphium.messaging.SingleCollectionMessaging;
import de.caluga.morphium.messaging.StatusInfoListener;
import de.caluga.morphium.query.Query;
import java.lang.invoke.CallSite;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Messaging(name="MultiCollectionMessaging", description="Advanced multi-collection messaging implementation")
public class MultiCollectionMessaging
implements MorphiumMessaging {
    public static final String NAME = "MultiCollectionMessaging";
    private Logger log = LoggerFactory.getLogger(MultiCollectionMessaging.class);
    private Morphium morphium;
    private MessagingSettings effectiveSettings;
    private ThreadPoolExecutor threadPool;
    private Set<MorphiumId> processingMessages = ConcurrentHashMap.newKeySet();
    private final Map<MorphiumId, Queue<Msg>> waitingForAnswers = new ConcurrentHashMap<MorphiumId, Queue<Msg>>();
    private final Map<MorphiumId, CallbackRequest> waitingForCallbacks = new ConcurrentHashMap<MorphiumId, CallbackRequest>();
    private Map<String, List<Map<MType, Object>>> monitorsByTopic = new ConcurrentHashMap<String, List<Map<MType, Object>>>();
    private AtomicBoolean running = new AtomicBoolean(false);
    private Map<String, String> lockCollectionNames = new ConcurrentHashMap<String, String>();
    private static Vector<MultiCollectionMessaging> allMessagings = new Vector();
    private Set<String> pausedTopics = ConcurrentHashMap.newKeySet();
    private StatusInfoListener statusInfoListener = new StatusInfoListener();
    private String hostname = null;
    private String senderId;
    private Map<String, AtomicInteger> pollTrigger = new ConcurrentHashMap<String, AtomicInteger>();
    private final Map<String, Long> pausedAt = new ConcurrentHashMap<String, Long>();
    private ScheduledThreadPoolExecutor decouplePool;
    private AsyncOperationCallback aCallback = new AsyncOperationCallback<Msg>(){

        @Override
        public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Msg entity, Object ... param) {
            MultiCollectionMessaging.this.log.error("Could not store {}", (Object)error, (Object)t);
        }

        @Override
        public void onOperationSucceeded(AsyncOperationType type, Query<Msg> q, long duration, List<Msg> result, Msg entity, Object ... param) {
        }
    };
    private ChangeStreamMonitor directMessagesMonitor;

    public MultiCollectionMessaging() {
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        this.decouplePool = new ScheduledThreadPoolExecutor(1, Thread.ofVirtual().name("decouple_thr-", 0L).factory());
        allMessagings.add(this);
    }

    @Override
    public List<MorphiumMessaging> getAlternativeMessagings() {
        return new Vector<MorphiumMessaging>(allMessagings);
    }

    public String getDMCollectionName() {
        return this.getDMCollectionName(this.getSenderId());
    }

    @Override
    public String getDMCollectionName(String sender) {
        return "dm_" + this.morphium.getARHelper().createCamelCase(sender, false);
    }

    @Override
    public void start() {
        this.running.set(true);
        this.decouplePool.scheduleWithFixedDelay(() -> {
            for (String name : this.pollTrigger.keySet()) {
                if (this.pollTrigger.get(name).get() == 0) continue;
                if (name.startsWith("dm_")) {
                    this.pollAndProcessDms(name.substring(3));
                } else if (!this.isUseChangeStream()) {
                    this.pollAndProcess(name);
                }
                this.pollTrigger.get(name).set(0);
            }
            HashMap<MorphiumId, CallbackRequest> cp = new HashMap<MorphiumId, CallbackRequest>(this.waitingForCallbacks);
            for (Map.Entry entry : cp.entrySet()) {
                if (System.currentTimeMillis() - ((CallbackRequest)entry.getValue()).timestamp <= ((CallbackRequest)entry.getValue()).ttl) continue;
                this.decouplePool.schedule(() -> this.waitingForCallbacks.remove(entry.getKey()), 1000L, TimeUnit.MILLISECONDS);
            }
        }, 1000L, this.effectiveSettings.getMessagingPollPause(), TimeUnit.MILLISECONDS);
        String dmCollectionName = this.getDMCollectionName();
        this.morphium.ensureIndicesFor(Msg.class, dmCollectionName);
        List<Map<String, Object>> pipeline = List.of(Doc.of("$match", Doc.of("operationType", Doc.of("$eq", "insert"))));
        this.directMessagesMonitor = new ChangeStreamMonitor(this.morphium, dmCollectionName, true, this.morphium.getConfig().connectionSettings().getMaxWaitTime(), pipeline);
        this.directMessagesMonitor.addListener(evt -> {
            Map<String, Object> doc = evt.getFullDocument();
            Msg msg = this.morphium.getMapper().deserialize(Msg.class, doc);
            if (msg.isAnswer()) {
                this.handleAnswer(msg);
            } else {
                if (this.pausedTopics.contains(msg.getTopic())) {
                    return this.running.get();
                }
                if (this.monitorsByTopic.containsKey(msg.getTopic())) {
                    if (!this.processingMessages.add(msg.getMsgId())) {
                        return this.running.get();
                    }
                    List<Map<MType, Object>> listeners = this.monitorsByTopic.get(msg.getTopic());
                    int listenerCount = (int)listeners.stream().filter(map -> map.get((Object)MType.listener) != null).count();
                    AtomicInteger remainingListeners = new AtomicInteger(listenerCount);
                    for (Map<MType, Object> map2 : listeners) {
                        MessageListener l = (MessageListener)map2.get((Object)MType.listener);
                        if (l == null) continue;
                        this.queueOrRun(() -> {
                            try {
                                Msg ret;
                                block15: {
                                    if (l.markAsProcessedBeforeExec()) {
                                        this.updateProcessedBy(msg);
                                    }
                                    ret = l.onMessage(this, msg);
                                    if (this.running.get()) break block15;
                                    return;
                                }
                                try {
                                    if (ret == null && this.effectiveSettings.isAutoAnswer()) {
                                        ret = new Msg(msg.getTopic(), "received", "");
                                    }
                                    if (ret != null) {
                                        ret.setRecipient(msg.getSender());
                                        ret.setInAnswerTo(msg.getMsgId());
                                        this.queueMessage(ret);
                                    }
                                }
                                catch (Exception e) {
                                    this.log.error("Error processinig message");
                                }
                                boolean deleted = false;
                                if (msg.isDeleteAfterProcessing()) {
                                    if (msg.getDeleteAfterProcessingTime() == 0) {
                                        this.morphium.remove(msg, dmCollectionName);
                                        deleted = true;
                                    } else {
                                        msg.setDeleteAt(new Date(System.currentTimeMillis() + (long)msg.getDeleteAfterProcessingTime()));
                                        msg.addProcessedId(this.getSenderId());
                                        this.morphium.updateUsingFields(msg, dmCollectionName, new AsyncCallbackAdapter(), Msg.Fields.deleteAt, Msg.Fields.processedBy);
                                    }
                                }
                                if (!deleted && !l.markAsProcessedBeforeExec()) {
                                    this.updateProcessedBy(msg);
                                }
                            }
                            finally {
                                if (remainingListeners.decrementAndGet() == 0) {
                                    this.processingMessages.remove(msg.getMsgId());
                                }
                            }
                        });
                    }
                } else {
                    this.log.warn("incoming direct message for topic {} - no listener registered", (Object)msg.getTopic());
                    this.morphium.remove(msg, dmCollectionName);
                }
            }
            return this.running.get();
        });
        this.directMessagesMonitor.start();
        if (!this.isUseChangeStream()) {
            this.log.info("Start polling as changestreams are disabled");
            this.decouplePool.scheduleWithFixedDelay(() -> {
                try {
                    this.pollAndProcess();
                }
                catch (Throwable e) {
                    this.log.info("Error in polling thread", e);
                }
            }, 1000L, this.getPause(), TimeUnit.MILLISECONDS);
        } else {
            this.pollAndProcess();
        }
    }

    @Override
    public void enableStatusInfoListener() {
        this.effectiveSettings.setMessagingStatusInfoListenerEnabled(true);
        this.addListenerForTopic(this.effectiveSettings.getMessagingStatusInfoListenerName(), this.statusInfoListener);
    }

    @Override
    public void disableStatusInfoListener() {
        this.effectiveSettings.setMessagingStatusInfoListenerEnabled(false);
        this.removeListenerForTopic(this.effectiveSettings.getMessagingStatusInfoListenerName(), this.statusInfoListener);
    }

    @Override
    public String getStatusInfoListenerName() {
        return this.effectiveSettings.getMessagingStatusInfoListenerName();
    }

    @Override
    public void setStatusInfoListenerName(String statusInfoListenerName) {
        this.effectiveSettings.setMessagingStatusInfoListenerName(statusInfoListenerName);
    }

    @Override
    public int getProcessingCount() {
        return this.threadPool.getActiveCount();
    }

    @Override
    public int getInProgressCount() {
        return this.getProcessingCount();
    }

    @Override
    public int waitingForAnswersCount() {
        return this.waitingForAnswers.size();
    }

    @Override
    public int waitingForAnswersTotalCount() {
        int cnt = 0;
        for (Queue<Msg> l : this.waitingForAnswers.values()) {
            cnt += l.size();
        }
        return cnt;
    }

    @Override
    public boolean isStatusInfoListenerEnabled() {
        return this.effectiveSettings.isMessagingStatusInfoListenerEnabled();
    }

    @Override
    public void setStatusInfoListenerEnabled(boolean statusInfoListenerEnabled) {
        if (statusInfoListenerEnabled) {
            if (!this.monitorsByTopic.containsKey(this.effectiveSettings.getMessagingStatusInfoListenerName()) || !this.monitorsByTopic.get(this.effectiveSettings.getMessagingStatusInfoListenerName()).contains(this.statusInfoListener)) {
                this.addListenerForTopic(this.effectiveSettings.getMessagingStatusInfoListenerName(), this.statusInfoListener);
            }
        } else {
            this.removeListenerForTopic(this.effectiveSettings.getMessagingStatusInfoListenerName(), this.statusInfoListener);
        }
    }

    @Override
    public Map<String, List<String>> getListenerNames() {
        HashMap<String, List<String>> ret = new HashMap<String, List<String>>();
        for (Map.Entry<String, List<Map<MType, Object>>> e : this.monitorsByTopic.entrySet()) {
            ArrayList<String> lst = new ArrayList<String>();
            for (Map<MType, Object> l : e.getValue()) {
                MessageListener listener = (MessageListener)l.get((Object)MType.listener);
                lst.add(listener.getClass().getName());
            }
            ret.put(e.getKey(), lst);
        }
        return ret;
    }

    @Override
    public Map<String, Long> getThreadPoolStats() {
        if (this.threadPool == null) {
            return Map.of();
        }
        String prefix = "messaging.threadpool.";
        return UtilsMap.of(prefix + "largest_poolsize", Long.valueOf(this.threadPool.getLargestPoolSize())).add((CallSite)((Object)(prefix + "task_count")), this.threadPool.getTaskCount()).add((CallSite)((Object)(prefix + "core_size")), Long.valueOf(this.threadPool.getCorePoolSize())).add((CallSite)((Object)(prefix + "maximum_pool_size")), Long.valueOf(this.threadPool.getMaximumPoolSize())).add((CallSite)((Object)(prefix + "pool_size")), Long.valueOf(this.threadPool.getPoolSize())).add((CallSite)((Object)(prefix + "active_count")), Long.valueOf(this.threadPool.getActiveCount())).add((CallSite)((Object)(prefix + "completed_task_count")), this.threadPool.getCompletedTaskCount());
    }

    @Override
    public long getPendingMessagesCount() {
        long sum = 0L;
        for (String msgName : this.monitorsByTopic.keySet()) {
            Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName(msgName));
            q1.f((Enum)Msg.Fields.sender).ne(this.getSenderId()).f("processed_by.0").notExists();
            sum += q1.countAll();
        }
        return sum;
    }

    @Override
    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName(m));
    }

    @Override
    public int getAsyncMessagesPending() {
        return this.waitingForCallbacks.size();
    }

    @Override
    public void pauseTopicProcessing(String name) {
        this.pausedTopics.add(name);
        this.pausedAt.putIfAbsent(name, System.currentTimeMillis());
    }

    @Override
    public List<String> getPausedTopics() {
        return new ArrayList<String>(this.pausedTopics);
    }

    @Override
    public Long unpauseTopicProcessing(String name) {
        this.pausedTopics.remove(name);
        Long started = this.pausedAt.remove(name);
        this.decouplePool.execute(() -> {
            if (!this.isUseChangeStream()) {
                this.pollAndProcess(name);
            }
            this.pollAndProcessDms(name);
        });
        if (started == null) {
            return 0L;
        }
        return System.currentTimeMillis() - started;
    }

    @Override
    public String getCollectionName() {
        return this.effectiveSettings.getMessageQueueName();
    }

    public String getCollectionName(Msg m) {
        return this.getCollectionName(m.getTopic());
    }

    @Override
    public String getCollectionName(String n) {
        return (this.getCollectionName() + "_" + n).replaceAll(" ", "").replaceAll("-", "").replaceAll("/", "");
    }

    private MsgLock getLock(Msg m) {
        return this.morphium.findById(MsgLock.class, m.getMsgId(), this.getLockCollectionName() + "_" + m.getTopic());
    }

    @Override
    public String getLockCollectionName() {
        if (this.lockCollectionNames.get(".") == null) {
            this.lockCollectionNames.put(".", this.getCollectionName() + "_lck");
        }
        return this.lockCollectionNames.get(".");
    }

    public String getLockCollectionName(Msg m) {
        return this.getLockCollectionName(m.getTopic());
    }

    @Override
    public String getLockCollectionName(String name) {
        if (this.lockCollectionNames.get(name) == null) {
            this.lockCollectionNames.put(name, this.getLockCollectionName() + "_" + name).replaceAll(" ", "").replaceAll("-", "").replaceAll("/", "");
        }
        return this.lockCollectionNames.get(name);
    }

    private boolean lockMessage(Msg m, String lockId) {
        return this.lockMessage(m, lockId, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean lockMessage(Msg m, String lockId, Date delAt) {
        MsgLock lck = new MsgLock(m);
        lck.setLockId(lockId);
        if (delAt != null) {
            lck.setDeleteAt(delAt);
        } else {
            lck.setDeleteAt(new Date(System.currentTimeMillis() + m.getTtl() * 2L));
        }
        MongoCommand cmd = null;
        try {
            cmd = new InsertMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.morphium.getWriteConcernForClass(MsgLock.class)));
            ((InsertMongoCommand)((InsertMongoCommand)cmd.setColl(this.getLockCollectionName(m))).setDb(this.morphium.getDatabase())).setDocuments(List.of(this.morphium.getMapper().serialize(lck)));
            ((InsertMongoCommand)cmd).execute();
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            boolean bl = false;
            return bl;
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    private void handleAnswer(Msg m) {
        Queue<Msg> answersForMessage = this.waitingForAnswers.get(m.getInAnswerTo());
        if (null != answersForMessage) {
            this.updateProcessedBy(m);
            if (!answersForMessage.contains(m)) {
                answersForMessage.add(m);
            }
            this.checkDeleteAfterProcessing(m);
            return;
        }
        if (this.pausedTopics.contains(m.getTopic())) {
            return;
        }
        CallbackRequest cbr = this.waitingForCallbacks.get(m.getInAnswerTo());
        Msg theMessage = m;
        if (cbr != null) {
            SingleCollectionMessaging.AsyncMessageCallback cb = cbr.callback;
            Runnable cbRunnable = () -> {
                cb.incomingMessage(theMessage);
                this.waitingForCallbacks.remove(m.getInAnswerTo());
            };
            this.queueOrRun(cbRunnable);
        } else {
            this.processMessage(theMessage);
        }
    }

    private void processMessage(Msg m) {
        if (!this.monitorsByTopic.containsKey(m.getTopic())) {
            return;
        }
        if (!this.processingMessages.add(m.getMsgId())) {
            return;
        }
        List<Map<MType, Object>> listeners = this.monitorsByTopic.get(m.getTopic());
        int listenerCount = (int)listeners.stream().filter(e -> e.get((Object)MType.listener) != null).count();
        AtomicInteger remainingListeners = new AtomicInteger(listenerCount);
        for (Map<MType, Object> e2 : listeners) {
            MessageListener l = (MessageListener)e2.get((Object)MType.listener);
            if (l == null) continue;
            Runnable r = () -> {
                Msg current = m;
                try {
                    if (current == null) {
                        this.log.info("Reread failed");
                        return;
                    }
                    if (current.getProcessedBy().contains(this.getSenderId())) {
                        return;
                    }
                    if (this.pausedTopics.contains(current.getTopic())) {
                        return;
                    }
                    if (l.markAsProcessedBeforeExec()) {
                        this.updateProcessedBy(current);
                    }
                    Msg ret = l.onMessage(this, current);
                    if (!this.running.get()) {
                        return;
                    }
                    if (this.effectiveSettings.isAutoAnswer() && ret == null) {
                        ret = new Msg(current.getTopic(), "received", "");
                    }
                    if (!this.checkDeleteAfterProcessing(current) && !l.markAsProcessedBeforeExec()) {
                        this.updateProcessedBy(current);
                    }
                    if (ret != null) {
                        ret.setSender(this.getSenderId());
                        ret.setRecipient(current.getSender());
                        ret.setInAnswerTo(current.getMsgId());
                        this.sendMessage(ret);
                    }
                }
                catch (MessageRejectedException mre) {
                    this.log.warn("Message rejected");
                    this.updateProcessedBy(current);
                    this.unlock(current);
                }
                catch (Throwable err) {
                    this.log.error("Error during message processing", err);
                    this.unlock(current);
                }
                finally {
                    if (remainingListeners.decrementAndGet() == 0) {
                        this.processingMessages.remove(m.getMsgId());
                    }
                }
            };
            this.queueOrRun(r);
        }
    }

    private void pollAndProcessDms(String name) {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getDMCollectionName()).f((Enum)Msg.Fields.processedBy).eq(null).f((Enum)Msg.Fields.topic).nin(this.getPausedTopics()).f((Enum)Msg.Fields.topic).eq(name).f((Enum)Msg.Fields.msgId).nin(new ArrayList<MorphiumId>(this.processingMessages));
        int window = this.getWindowSize();
        q.limit(window + 1);
        int seen = 0;
        boolean more = false;
        for (Msg m : q.asIterable(window + 1)) {
            if (seen >= window) {
                more = true;
                break;
            }
            for (Map<MType, Object> e : this.monitorsByTopic.get(m.getTopic())) {
                this.queueOrRun(() -> {
                    if (m.isAnswer()) {
                        this.handleAnswer(m);
                    } else {
                        this.processMessage(m);
                    }
                });
            }
            ++seen;
        }
        if (more) {
            this.pollTrigger.putIfAbsent("dm_" + name, new AtomicInteger(0));
            this.pollTrigger.get("dm_" + name).incrementAndGet();
        }
    }

    private void pollAndProcess(String msgName) {
        if (!this.running.get()) {
            return;
        }
        ArrayList<MorphiumId> processingIds = new ArrayList<MorphiumId>(this.processingMessages);
        Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName(msgName));
        q1.f((Enum)Msg.Fields.exclusive).eq(true).f("processed_by.0").notExists().f((Enum)Msg.Fields.sender).ne(this.getSenderId());
        if (!processingIds.isEmpty()) {
            q1.f((Enum)Msg.Fields.msgId).nin(processingIds);
        }
        Query<Msg> q2 = this.morphium.createQueryFor(Msg.class, this.getCollectionName(msgName));
        q2.f((Enum)Msg.Fields.exclusive).eq(false).f((Enum)Msg.Fields.processedBy).ne(this.getSenderId()).f((Enum)Msg.Fields.sender).ne(this.getSenderId());
        if (!processingIds.isEmpty()) {
            q2.f((Enum)Msg.Fields.msgId).nin(processingIds);
        }
        Query<Msg> q = this.morphium.createQueryFor(Msg.class, this.getCollectionName(msgName));
        q.sort(Msg.Fields.priority);
        q.or(q1, q2);
        int window = this.getWindowSize();
        q.limit(window + 1);
        if (!this.running.get()) {
            return;
        }
        int seen = 0;
        boolean more = false;
        for (Msg m : q.asIterable(window + 1)) {
            if (seen >= window) {
                more = true;
                break;
            }
            if (m.isTimingOut() && System.currentTimeMillis() - m.getTimestamp() > m.getTtl()) {
                this.log.debug("deleting outdated message");
                this.morphium.delete(m);
                return;
            }
            if (this.pausedTopics.contains(m.getTopic())) {
                return;
            }
            if (m.isAnswer()) {
                this.handleAnswer(m);
            } else {
                if (this.pausedTopics.contains(m.getTopic())) continue;
                if (m.getRecipients() != null && !m.getRecipients().isEmpty() && !m.getRecipients().contains(this.getSenderId())) {
                    this.log.warn("Got direct message not for me? Recipients: {}", m.getRecipients());
                    return;
                }
                if (this.processingMessages.contains(m.getMsgId())) continue;
                if (m.isExclusive() && !this.lockMessage(m, this.getSenderId())) {
                    return;
                }
                this.processMessage(m);
            }
            ++seen;
        }
        if (more) {
            this.pollTrigger.putIfAbsent(msgName, new AtomicInteger(0));
            this.pollTrigger.get(msgName).incrementAndGet();
        }
    }

    private boolean checkDeleteAfterProcessing(Msg message) {
        if (message.isDeleteAfterProcessing()) {
            if (message.getDeleteAfterProcessingTime() == 0) {
                this.morphium.delete(message, this.getCollectionName(message));
                return true;
            }
            message.setDeleteAt(new Date(System.currentTimeMillis() + (long)message.getDeleteAfterProcessingTime()));
            this.morphium.setInEntity(message, this.getCollectionName(message), Msg.Fields.deleteAt, new Date(System.currentTimeMillis() + (long)message.getDeleteAfterProcessingTime()));
            if (message.isExclusive()) {
                this.morphium.createQueryFor(MsgLock.class, this.getLockCollectionName(message)).f("_id").eq(message.getMsgId()).set(MsgLock.Fields.deleteAt, (Object)new Date(System.currentTimeMillis() + (long)message.getDeleteAfterProcessingTime()));
            }
        }
        return false;
    }

    private void queueOrRun(Runnable r) {
        if (this.effectiveSettings.isMessagingMultithreadded()) {
            try {
                this.threadPool.execute(r);
            }
            catch (Throwable throwable) {}
        } else {
            r.run();
        }
    }

    private void pollAndProcess() {
        for (String name : this.monitorsByTopic.keySet()) {
            this.pollAndProcess(name);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateProcessedBy(Msg msg) {
        if (msg == null) {
            return;
        }
        if (msg.getProcessedBy().contains(this.getSenderId())) {
            return;
        }
        String id = this.getSenderId();
        msg.getProcessedBy().add(this.getSenderId());
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class, this.getCollectionName(msg));
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        Map<String, Object> qobj = idq.toQueryObject();
        Doc set = Doc.of("processed_by", this.getSenderId());
        Doc update = Doc.of("$addToSet", set);
        MongoCommand cmd = null;
        try {
            cmd = new UpdateMongoCommand(this.morphium.getDriver().getPrimaryConnection(this.getMorphium().getWriteConcernForClass(Msg.class)));
            ((UpdateMongoCommand)cmd.setColl(this.getCollectionName(msg))).setDb(this.morphium.getDatabase());
            ((UpdateMongoCommand)cmd).addUpdate(qobj, update, null, false, false, null, null, null);
            if (!this.running.get()) {
                return;
            }
            Map<String, Object> ret = ((WriteMongoCommand)cmd).execute();
            cmd.releaseConnection();
            cmd = null;
        }
        catch (MorphiumDriverException e) {
            this.log.error("Error updating processed by - this might lead to duplicate execution!", (Throwable)e);
        }
        finally {
            if (cmd != null) {
                cmd.releaseConnection();
            }
        }
    }

    @Override
    public void addListenerForTopic(String n, MessageListener l) {
        LinkedHashMap<String, Map<Object, Object>> match = new LinkedHashMap<String, Map<Object, Object>>();
        LinkedHashMap<String, List<String>> in = new LinkedHashMap<String, List<String>>();
        in.put("$in", Arrays.asList("insert"));
        match.put("operationType", in);
        match.put("full_document.sender", Map.of("$ne", this.getSenderId()));
        ArrayList<Map<String, Object>> pipeline = new ArrayList<Map<String, Object>>();
        pipeline.add(UtilsMap.of("$match", match));
        this.log.debug("Adding changestream for collection {}", (Object)this.getCollectionName(n));
        this.morphium.ensureIndicesFor(Msg.class, this.getCollectionName(n));
        ChangeStreamMonitor cm = new ChangeStreamMonitor(this.morphium, this.getCollectionName(n), true, this.morphium.getConfig().connectionSettings().getMaxWaitTime(), pipeline);
        cm.addListener(evt -> {
            Map<String, Object> map = evt.getFullDocument();
            if (this.pausedTopics.contains(map.get(Msg.Fields.topic.name()))) {
                return this.running.get();
            }
            Msg doc = this.morphium.getMapper().deserialize(Msg.class, map);
            if (doc.getSender().equals(this.getSenderId())) {
                return this.running.get();
            }
            if (!this.processingMessages.add(doc.getMsgId())) {
                this.log.info("could not add to processingMessages - already processing");
                return this.running.get();
            }
            Runnable r = () -> {
                Msg ret;
                Msg current;
                block20: {
                    block19: {
                        block17: {
                            block18: {
                                if (!doc.isExclusive()) break block17;
                                if (doc.getProcessedBy() == null || doc.getProcessedBy().isEmpty()) break block18;
                                this.processingMessages.remove(doc.getMsgId());
                                return;
                            }
                            if (this.lockMessage(doc, this.getSenderId())) break block17;
                            this.processingMessages.remove(doc.getMsgId());
                            return;
                        }
                        current = doc;
                        if (current != null) break block19;
                        this.log.info("Reread failed");
                        this.processingMessages.remove(doc.getMsgId());
                        return;
                    }
                    if (l.markAsProcessedBeforeExec()) {
                        this.updateProcessedBy(current);
                    }
                    ret = l.onMessage(this, current);
                    if (this.running.get()) break block20;
                    this.processingMessages.remove(doc.getMsgId());
                    return;
                }
                try {
                    try {
                        if (ret == null && this.effectiveSettings.isAutoAnswer()) {
                            ret = new Msg(current.getTopic(), "received", "");
                        }
                        boolean deleted = false;
                        if (current.isDeleteAfterProcessing()) {
                            deleted = this.checkDeleteAfterProcessing(current);
                        }
                        if (!deleted && !l.markAsProcessedBeforeExec()) {
                            this.updateProcessedBy(current);
                        }
                        if (ret != null) {
                            ret.setInAnswerTo(current.getMsgId());
                            ret.setRecipients(List.of(current.getSender()));
                            this.sendMessage(ret);
                        }
                    }
                    catch (MessageRejectedException mre) {
                        this.unlock(current);
                        this.log.warn("Message rejected", (Throwable)mre);
                    }
                    catch (Exception e) {
                        this.unlock(current);
                        this.log.error("Error processing message", (Throwable)e);
                    }
                }
                catch (Exception e) {
                    this.log.error("Error during change event processing", (Throwable)e);
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    this.processingMessages.remove(doc.getMsgId());
                }
            };
            this.queueOrRun(r);
            return this.running.get();
        });
        ChangeStreamMonitor lockMonitor = new ChangeStreamMonitor(this.morphium, this.getLockCollectionName(n), false, this.effectiveSettings.getMessagingPollPause(), List.of(Doc.of("$match", Doc.of("operationType", Doc.of("$eq", "delete")))));
        lockMonitor.addListener(evt -> {
            Object id = evt.getId();
            if (this.morphium.createQueryFor(Msg.class).setCollectionName(this.getCollectionName(n)).f((Enum)Msg.Fields.msgId).eq(id).countAll() != 0L) {
                this.pollTrigger.putIfAbsent(n, new AtomicInteger());
                this.pollTrigger.get(n).incrementAndGet();
            }
            return this.running.get();
        });
        this.monitorsByTopic.putIfAbsent(n, new ArrayList());
        this.monitorsByTopic.get(n).add(Map.of(MType.monitor, cm, MType.listener, l, MType.lockMonitor, lockMonitor));
        cm.start();
        this.pollAndProcess(n);
    }

    private void unlock(Msg msg) {
        if (msg.isExclusive()) {
            this.morphium.createQueryFor(MsgLock.class).setCollectionName(this.getLockCollectionName(msg)).f("_id").eq(msg.getMsgId()).delete();
        }
    }

    @Override
    public void removeListenerForTopic(String topic, MessageListener l) {
        int idx = -1;
        for (Map<MType, Object> cm : this.monitorsByTopic.get(topic)) {
            ++idx;
            if (cm.get((Object)MType.listener) != l) continue;
            break;
        }
        if (idx >= 0) {
            ((ChangeStreamMonitor)this.monitorsByTopic.get(topic).get(idx).get((Object)MType.monitor)).terminate();
            ((ChangeStreamMonitor)this.monitorsByTopic.get(topic).get(idx).get((Object)MType.lockMonitor)).terminate();
            this.monitorsByTopic.get(topic).remove(idx);
        }
        if (this.monitorsByTopic.get(topic).isEmpty()) {
            this.monitorsByTopic.remove(topic);
        }
    }

    @Override
    public String getSenderId() {
        return this.senderId;
    }

    @Override
    public MorphiumMessaging setSenderId(String id) {
        this.senderId = id;
        return this;
    }

    @Override
    public int getPause() {
        return this.effectiveSettings.getMessagingPollPause();
    }

    @Override
    public MorphiumMessaging setPause(int pause) {
        this.effectiveSettings.setMessagingPollPause(pause);
        return this;
    }

    @Override
    public boolean isRunning() {
        return this.running.get();
    }

    @Override
    public void close() {
        this.terminate();
    }

    @Override
    public void terminate() {
        this.running.set(false);
        for (Map.Entry<String, List<Map<MType, Object>>> e : this.monitorsByTopic.entrySet()) {
            for (Map<MType, Object> m : e.getValue()) {
                ((ChangeStreamMonitor)m.get((Object)MType.monitor)).terminate();
                ((ChangeStreamMonitor)m.get((Object)MType.lockMonitor)).terminate();
            }
        }
        if (this.directMessagesMonitor != null) {
            this.directMessagesMonitor.terminate();
        }
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        if (this.decouplePool != null) {
            this.decouplePool.shutdownNow();
        }
        if (this.monitorsByTopic != null) {
            this.monitorsByTopic.clear();
        }
        if (this.waitingForAnswers != null) {
            this.waitingForAnswers.clear();
        }
        if (this.waitingForCallbacks != null) {
            this.waitingForCallbacks.clear();
        }
        if (allMessagings != null) {
            allMessagings.remove(this);
        }
    }

    private void persistMessage(Msg m, boolean async) {
        m.setSenderHost(this.hostname);
        m.setSender(this.getSenderId());
        if (m.getRecipients() == null || m.getRecipients().isEmpty()) {
            if (async) {
                this.morphium.store(m, this.getCollectionName(m), this.aCallback);
            } else {
                this.morphium.store(m, this.getCollectionName(m), null);
            }
        } else {
            for (String rec : m.getRecipients()) {
                if (async) {
                    this.morphium.store(m, this.getDMCollectionName(rec), this.aCallback);
                    continue;
                }
                this.morphium.store(m, this.getDMCollectionName(rec), null);
            }
        }
    }

    @Override
    public void queueMessage(Msg m) {
        this.persistMessage(m, true);
    }

    @Override
    public void sendMessage(Msg m) {
        this.persistMessage(m, false);
    }

    @Override
    public long getNumberOfMessages() {
        long total = 0L;
        try {
            for (String msgName : this.monitorsByTopic.keySet()) {
                Query<Msg> q1 = this.morphium.createQueryFor(Msg.class, this.getCollectionName(msgName));
                q1.f((Enum)Msg.Fields.sender).ne(this.getSenderId()).f("processed_by.0").notExists();
                total += q1.countAll();
            }
            Query<Msg> qdm = this.morphium.createQueryFor(Msg.class, this.getDMCollectionName());
            qdm.f((Enum)Msg.Fields.sender).ne(this.getSenderId()).f("processed_by.0").notExists();
            total += qdm.countAll();
        }
        catch (Exception e) {
            this.log.warn("Error calculating number of messages", (Throwable)e);
        }
        return total;
    }

    @Override
    public void sendMessageToSelf(Msg m) {
        m.setSender("self");
        m.setRecipient(this.getSenderId());
        this.morphium.store(m, this.getDMCollectionName(), null);
    }

    @Override
    public void queueMessagetoSelf(Msg m) {
        m.setSender("self");
        m.setRecipient(this.getSenderId());
        this.morphium.store(m, this.getDMCollectionName(), this.aCallback);
    }

    @Override
    public boolean isAutoAnswer() {
        return this.effectiveSettings.isAutoAnswer();
    }

    @Override
    public MorphiumMessaging setAutoAnswer(boolean autoAnswer) {
        this.effectiveSettings.setAutoAnswer(autoAnswer);
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs, boolean throwExceptionOnTimeout) {
        if (!this.running.get()) {
            throw new SingleCollectionMessaging.SystemShutdownException("Messaging shutting down - abort sending!");
        }
        if (theMessage.getMsgId() == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        MorphiumId requestMsgId = theMessage.getMsgId();
        LinkedBlockingDeque blockingQueue = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, blockingQueue);
        try {
            this.sendMessage(theMessage);
            Msg firstAnswer = (Msg)blockingQueue.poll(timeoutInMs, TimeUnit.MILLISECONDS);
            if (null == firstAnswer && throwExceptionOnTimeout) {
                throw new SingleCollectionMessaging.MessageTimeoutException("Did not receive answer for message " + theMessage.getTopic() + "/" + String.valueOf(requestMsgId) + " in time (" + timeoutInMs + "ms)");
            }
            Msg msg = firstAnswer;
            return (T)msg;
        }
        catch (InterruptedException e) {
            this.log.error("Did not receive answer for message " + theMessage.getTopic() + "/" + String.valueOf(requestMsgId) + " interrupted.", (Throwable)e);
        }
        finally {
            this.waitingForAnswers.remove(requestMsgId);
        }
        return null;
    }

    @Override
    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout) {
        return this.sendAndAwaitAnswers(theMessage, numberOfAnswers, timeout, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T extends Msg> List<T> sendAndAwaitAnswers(T theMessage, int numberOfAnswers, long timeout, boolean throwExceptionOnTimeout) {
        MorphiumId requestMsgId = theMessage.getMsgId();
        if (requestMsgId == null) {
            theMessage.setMsgId(new MorphiumId());
            requestMsgId = theMessage.getMsgId();
        }
        LinkedBlockingDeque answerList = new LinkedBlockingDeque();
        this.waitingForAnswers.put(requestMsgId, answerList);
        this.sendMessage(theMessage);
        long start = System.currentTimeMillis();
        ArrayList returnValue = null;
        try {
            while (this.running.get()) {
                if (answerList.size() > 0 && numberOfAnswers > 0 && answerList.size() >= numberOfAnswers) {
                    break;
                }
                if (throwExceptionOnTimeout && System.currentTimeMillis() - start > timeout && answerList.isEmpty()) {
                    throw new SingleCollectionMessaging.MessageTimeoutException("Did not receive any answer for message " + theMessage.getTopic() + "/" + String.valueOf(requestMsgId) + "in time (" + timeout + ")");
                }
                if (System.currentTimeMillis() - start > timeout) {
                    break;
                }
                if (!this.running.get()) {
                    throw new SingleCollectionMessaging.SystemShutdownException("Messaging shutting down - abort waiting!");
                }
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(this.morphium.getConfig().driverSettings().getIdleSleepTime()));
            }
        }
        finally {
            returnValue = new ArrayList(this.waitingForAnswers.remove(requestMsgId));
        }
        return returnValue;
    }

    @Override
    public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) {
        return this.sendAndAwaitFirstAnswer(theMessage, timeoutInMs, true);
    }

    @Override
    public <T extends Msg> void sendAndAwaitAsync(T theMessage, long timeoutInMs, SingleCollectionMessaging.AsyncMessageCallback cb) {
        if (!this.running.get()) {
            throw new SingleCollectionMessaging.SystemShutdownException("Messaging shutting down - abort sending!");
        }
        if (theMessage.getMsgId() == null) {
            theMessage.setMsgId(new MorphiumId());
        }
        MorphiumId requestMsgId = theMessage.getMsgId();
        CallbackRequest cbr = new CallbackRequest(this);
        cbr.timestamp = System.currentTimeMillis();
        cbr.theMessage = theMessage;
        cbr.callback = cb;
        cbr.ttl = timeoutInMs;
        this.waitingForCallbacks.put(requestMsgId, cbr);
        this.sendMessage(theMessage);
        this.decouplePool.schedule(() -> this.waitingForCallbacks.remove(requestMsgId), timeoutInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean isProcessMultiple() {
        return this.effectiveSettings.isProcessMultiple();
    }

    @Override
    public MorphiumMessaging setProcessMultiple(boolean processMultiple) {
        this.effectiveSettings.setProcessMultiple(processMultiple);
        return this;
    }

    @Override
    public String getQueueName() {
        return this.effectiveSettings.getMessageQueueName();
    }

    @Override
    public MorphiumMessaging setQueueName(String queueName) {
        this.effectiveSettings.setMessageQueueName(queueName);
        return this;
    }

    @Override
    public boolean isMultithreadded() {
        return this.effectiveSettings.isMessagingMultithreadded();
    }

    @Override
    public MorphiumMessaging setMultithreadded(boolean multithreadded) {
        this.effectiveSettings.setMessagingMultithreadded(multithreadded);
        return this;
    }

    @Override
    public int getWindowSize() {
        return this.effectiveSettings.getMessagingWindowSize();
    }

    @Override
    public MorphiumMessaging setWindowSize(int windowSize) {
        this.effectiveSettings.setMessagingWindowSize(windowSize);
        return this;
    }

    @Override
    public boolean isUseChangeStream() {
        return this.morphium.getConfig().clusterSettings().isReplicaset() && this.effectiveSettings.isUseChangeStream();
    }

    @Override
    public int getRunningTasks() {
        return this.threadPool.getActiveCount();
    }

    @Override
    public Morphium getMorphium() {
        return this.morphium;
    }

    @Override
    public MorphiumMessaging setPolling(boolean doPolling) {
        this.effectiveSettings.setUseChangeStream(!doPolling);
        return this;
    }

    @Override
    public MorphiumMessaging setUseChangeStream(boolean useChangeStream) {
        this.effectiveSettings.setUseChangeStream(useChangeStream);
        return this;
    }

    @Override
    public void init(Morphium m) {
        this.init(m, m.getConfig().messagingSettings());
    }

    @Override
    public void init(Morphium m, MessagingSettings overrides) {
        this.morphium = m;
        this.effectiveSettings = overrides == m.getConfig().messagingSettings() ? m.getConfig().createCopy().messagingSettings() : overrides;
        if (this.effectiveSettings.getSenderId() == null) {
            this.setSenderId(UUID.randomUUID().toString());
        } else {
            this.setSenderId(this.effectiveSettings.getSenderId());
        }
        this.threadPool = new ThreadPoolExecutor(this.effectiveSettings.getThreadPoolMessagingCoreSize(), this.effectiveSettings.getThreadPoolMessagingMaxSize(), this.effectiveSettings.getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Thread.ofVirtual().name("msg-thr-", 0L).factory());
        this.threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                Thread.onSpinWait();
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(MultiCollectionMessaging.this.morphium.getConfig().driverSettings().getIdleSleepTime()));
                executor.execute(r);
            }
        });
    }

    private static enum MType {
        listener,
        monitor,
        lockMonitor;

    }

    private class CallbackRequest {
        Msg theMessage;
        SingleCollectionMessaging.AsyncMessageCallback callback;
        long ttl;
        long timestamp;

        private CallbackRequest(MultiCollectionMessaging multiCollectionMessaging) {
        }
    }
}

