package com.oracle.coherence.patterns.messaging;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.common.liveobjects.LiveObject;
import com.oracle.coherence.common.liveobjects.OnArrived;
import com.oracle.coherence.common.liveobjects.OnDeparting;
import com.oracle.coherence.common.liveobjects.OnInserted;
import com.oracle.coherence.common.liveobjects.OnRemoved;
import com.oracle.coherence.common.liveobjects.OnUpdated;
import com.oracle.coherence.common.processors.InvokeMethodProcessor;
import com.oracle.coherence.patterns.messaging.Subscription;
import com.oracle.coherence.patterns.messaging.management.MessagingMBeanManager;
import com.oracle.coherence.patterns.messaging.management.QueueProxy;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.ResourceRegistry;
import com.tangosol.util.processor.UpdaterProcessor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;

@LiveObject
/* loaded from: input_file:com/oracle/coherence/patterns/messaging/Queue.class */
public class Queue extends Destination {
    private static Logger logger = Logger.getLogger(Queue.class.getName());
    private HashMap<Integer, Long> lastMessageSequenceNumberMap;
    private LinkedList<MessageIdentifier> messagesToRedeliver;
    private LinkedList<MessageIdentifier> messagesToDeliver;
    private LinkedList<SubscriptionIdentifier> waitingSubscriptions;
    private LinkedList<SubscriptionIdentifier> waitingSubscriptionsDuplicate;
    private long numMessagesDelivered;
    private long numMessagesReceived;
    private int maxSubscribers;

    public Queue() {
        this.lastMessageSequenceNumberMap = null;
        this.maxSubscribers = -1;
    }

    public Queue(String str) {
        super(str);
        this.lastMessageSequenceNumberMap = null;
        this.maxSubscribers = -1;
        this.waitingSubscriptions = new LinkedList<>();
        this.waitingSubscriptionsDuplicate = new LinkedList<>();
        this.messagesToDeliver = new LinkedList<>();
        this.messagesToRedeliver = new LinkedList<>();
        this.lastMessageSequenceNumberMap = new HashMap<>();
        this.numMessagesDelivered = 0L;
        this.numMessagesReceived = 0L;
    }

    public Queue(String str, int i) {
        this(str);
        if (i > 0) {
            this.maxSubscribers = i;
        } else if (logger.isLoggable(Level.WARNING)) {
            logger.log(Level.WARNING, "maxSubscribers value of {0}is invalid, defaulting to unbounded (-1)", Integer.valueOf(i));
        }
    }

    public long getNumMessagesReceived() {
        return this.numMessagesReceived;
    }

    public long getNumMessagesToRedeliver() {
        return this.messagesToRedeliver.size();
    }

    public long getNumMessagesDelivered() {
        return this.numMessagesDelivered;
    }

    public long getNumMessagesToDeliver() {
        return this.messagesToDeliver.size();
    }

    public long getNumWaitingSubscriptions() {
        return this.waitingSubscriptions.size();
    }

    private boolean hasMessagesToRedeliver() {
        return this.messagesToRedeliver.size() > 0;
    }

    private boolean hasMessagesToDeliver() {
        return this.messagesToDeliver.size() > 0;
    }

    private boolean hasWaitingSubscriptions() {
        return this.waitingSubscriptions.size() > 0;
    }

    public int getMaxSubscribers() {
        return this.maxSubscribers;
    }

    public boolean isFullySubscribed() {
        return this.maxSubscribers != -1 && getSubscriptionIdentifiers().size() == this.maxSubscribers;
    }

    private MessageIdentifier nextMessageToDeliver() {
        if (this.messagesToDeliver.isEmpty()) {
            return null;
        }
        return this.messagesToDeliver.removeFirst();
    }

    private MessageIdentifier nextMessageToRedeliver() {
        if (this.messagesToRedeliver.isEmpty()) {
            return null;
        }
        return this.messagesToRedeliver.removeFirst();
    }

    public Long getLastMessageSequenceNumber(Integer num) {
        return this.lastMessageSequenceNumberMap.containsKey(num) ? this.lastMessageSequenceNumberMap.get(num) : 0L;
    }

    private SubscriptionIdentifier nextWaitingSubscription() {
        if (this.waitingSubscriptions.isEmpty()) {
            return null;
        }
        return this.waitingSubscriptions.removeFirst();
    }

    public void onAcceptMessage(MessageTracker messageTracker) {
        Iterator<MessageIdentifier> it = messageTracker.iterator();
        while (it.hasNext()) {
            acceptOneMessage(it.next());
        }
    }

    public void acceptOneMessage(MessageIdentifier messageIdentifier) {
        Integer valueOf = Integer.valueOf(messageIdentifier.getPartitionId());
        if (this.lastMessageSequenceNumberMap.containsKey(valueOf)) {
            if (messageIdentifier.getMessageSequenceNumber() <= this.lastMessageSequenceNumberMap.get(valueOf).longValue()) {
                return;
            }
        }
        this.lastMessageSequenceNumberMap.put(valueOf, new Long(messageIdentifier.getMessageSequenceNumber()));
        this.messagesToDeliver.add(messageIdentifier);
        this.numMessagesReceived++;
    }

    public void requestMessage(SubscriptionIdentifier subscriptionIdentifier) {
        if (!this.waitingSubscriptions.contains(subscriptionIdentifier)) {
            this.waitingSubscriptions.add(subscriptionIdentifier);
        } else {
            if (this.waitingSubscriptionsDuplicate.contains(subscriptionIdentifier)) {
                return;
            }
            this.waitingSubscriptionsDuplicate.add(subscriptionIdentifier);
        }
    }

    public void rollbackMessages(SubscriptionIdentifier subscriptionIdentifier, MessageTracker messageTracker) {
        addMessagesToRedeliver(messageTracker);
        CacheFactory.getCache(Message.CACHENAME).invokeAll(messageTracker.getMessageKeys(getIdentifier()), new UpdaterProcessor("makeInvisibleTo", subscriptionIdentifier));
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public void subscribe(SubscriptionIdentifier subscriptionIdentifier, SubscriptionConfiguration subscriptionConfiguration, Subscription subscription) {
        super.subscribe(subscriptionIdentifier, subscriptionConfiguration, subscription == null ? new QueueSubscription(subscriptionIdentifier, Subscription.Status.ENABLED, (LeasedSubscriptionConfiguration) subscriptionConfiguration, CacheFactory.getSafeTimeMillis()) : subscription);
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public void unsubscribe(SubscriptionIdentifier subscriptionIdentifier, MessageTracker messageTracker) {
        this.waitingSubscriptions.remove(subscriptionIdentifier);
        rollbackMessages(subscriptionIdentifier, messageTracker);
        super.unsubscribe(subscriptionIdentifier, messageTracker);
    }

    private void addMessagesToRedeliver(MessageTracker messageTracker) {
        Iterator<MessageIdentifier> it = messageTracker.iterator();
        while (it.hasNext()) {
            this.messagesToRedeliver.add(it.next());
        }
    }

    public QueueDeliveryResults doDelivery() {
        QueueDeliveryResults queueDeliveryResults = new QueueDeliveryResults();
        while (hasMessagesToRedeliver() && hasWaitingSubscriptions()) {
            deliverMessage(queueDeliveryResults, getIdentifier(), nextWaitingSubscription(), nextMessageToRedeliver(), true);
        }
        while (hasMessagesToDeliver() && hasWaitingSubscriptions()) {
            deliverMessage(queueDeliveryResults, getIdentifier(), nextWaitingSubscription(), nextMessageToDeliver(), false);
        }
        return queueDeliveryResults;
    }

    private void deliverMessage(QueueDeliveryResults queueDeliveryResults, Identifier identifier, SubscriptionIdentifier subscriptionIdentifier, MessageIdentifier messageIdentifier, boolean z) {
        MessageKey key = Message.getKey(identifier, messageIdentifier);
        try {
            try {
                CacheFactory.getCache(Message.CACHENAME).invoke(key, new UpdaterProcessor("makeVisibleTo", subscriptionIdentifier));
            } catch (Throwable th) {
                logger.log(Level.WARNING, "deliver message");
            }
            NamedCache cache = CacheFactory.getCache(Subscription.CACHENAME);
            InvokeMethodProcessor invokeMethodProcessor = null;
            try {
                invokeMethodProcessor = new InvokeMethodProcessor("acceptOneMessage", new Object[]{messageIdentifier, Boolean.valueOf(z)});
            } catch (Throwable th2) {
                logger.log(Level.WARNING, "deliver message");
            }
            Object invoke = cache.invoke(subscriptionIdentifier, invokeMethodProcessor);
            if (invoke instanceof Throwable) {
                throw ((Throwable) invoke);
            }
            if (invoke != null) {
                if (((Boolean) invoke).booleanValue()) {
                    queueDeliveryResults.addMessage(messageIdentifier);
                    queueDeliveryResults.addSubscription(subscriptionIdentifier);
                } else {
                    queueDeliveryResults.addMessage(messageIdentifier);
                }
            }
        } catch (Throwable th3) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "An exception occurred when the Queue tried to invoke Message.makeVisibleTo for message {0} and subscription {1} The message will be delivered to another subscription\n", new Object[]{messageIdentifier, subscriptionIdentifier});
            }
            queueDeliveryResults.addSubscription(subscriptionIdentifier);
            queueDeliveryResults.addMessage(messageIdentifier);
            queueDeliveryResults.addRedeliverMessage(messageIdentifier);
            CacheFactory.getCache(Message.CACHENAME).invoke(key, new UpdaterProcessor("makeInvisibleTo", subscriptionIdentifier));
        }
    }

    public void processDeliveryResults(QueueDeliveryResults queueDeliveryResults) {
        while (queueDeliveryResults.hasSubscriptions()) {
            SubscriptionIdentifier nextSubscription = queueDeliveryResults.nextSubscription();
            if (this.waitingSubscriptions.contains(nextSubscription)) {
                this.waitingSubscriptions.remove(nextSubscription);
            }
            if (this.waitingSubscriptionsDuplicate.contains(nextSubscription)) {
                this.waitingSubscriptions.add(nextSubscription);
                this.waitingSubscriptionsDuplicate.remove(nextSubscription);
            }
        }
        while (queueDeliveryResults.hasMessage()) {
            this.numMessagesDelivered++;
            MessageIdentifier nextMessage = queueDeliveryResults.nextMessage();
            if (this.messagesToDeliver.contains(nextMessage)) {
                this.messagesToDeliver.remove(nextMessage);
            } else if (this.messagesToRedeliver.contains(nextMessage)) {
                this.messagesToRedeliver.remove(nextMessage);
            }
        }
        while (queueDeliveryResults.hasMessagesToRedeliver()) {
            this.messagesToRedeliver.add(queueDeliveryResults.nextMessageToRedeliver());
        }
    }

    @OnInserted
    @OnUpdated
    @OnArrived
    public void onChanged(BinaryEntry binaryEntry) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Queue:onChanged identifier: {0}", getIdentifier());
        }
        MessagingMBeanManager messagingMBeanManager = MessagingMBeanManager.getInstance();
        messagingMBeanManager.registerMBean(this, QueueProxy.class, messagingMBeanManager.buildQueueMBeanName(getIdentifier()));
        QueueEngine queueEngine = (QueueEngine) CacheFactory.getConfigurableCacheFactory().getResourceRegistry().getResource(QueueEngine.class, getIdentifier().toString());
        if (queueEngine == null) {
            queueEngine = new QueueEngine(getIdentifier());
            CacheFactory.getConfigurableCacheFactory().getResourceRegistry().registerResource(QueueEngine.class, getIdentifier().toString(), queueEngine);
        }
        queueEngine.processRunEvent(binaryEntry);
    }

    @OnDeparting
    @OnRemoved
    public void onRemoved(BinaryEntry binaryEntry) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Queue:onRemoved identifier: {0}", getIdentifier());
        }
        ResourceRegistry resourceRegistry = CacheFactory.getConfigurableCacheFactory().getResourceRegistry();
        QueueEngine queueEngine = (QueueEngine) resourceRegistry.getResource(QueueEngine.class, getIdentifier().toString());
        if (queueEngine != null) {
            resourceRegistry.unregisterResource(QueueEngine.class, getIdentifier().toString());
            queueEngine.dispose();
        }
        MessagingMBeanManager messagingMBeanManager = MessagingMBeanManager.getInstance();
        messagingMBeanManager.unregisterMBean(this, messagingMBeanManager.buildQueueMBeanName(getIdentifier()));
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public void readExternal(DataInput dataInput) throws IOException {
        super.readExternal(dataInput);
        this.messagesToDeliver = new LinkedList<>();
        ExternalizableHelper.readCollection(dataInput, this.messagesToDeliver, Thread.currentThread().getContextClassLoader());
        this.messagesToRedeliver = new LinkedList<>();
        ExternalizableHelper.readCollection(dataInput, this.messagesToRedeliver, Thread.currentThread().getContextClassLoader());
        this.waitingSubscriptions = new LinkedList<>();
        ExternalizableHelper.readCollection(dataInput, this.waitingSubscriptions, Thread.currentThread().getContextClassLoader());
        this.waitingSubscriptionsDuplicate = new LinkedList<>();
        ExternalizableHelper.readCollection(dataInput, this.waitingSubscriptionsDuplicate, Thread.currentThread().getContextClassLoader());
        this.lastMessageSequenceNumberMap = new HashMap<>();
        ExternalizableHelper.readMap(dataInput, this.lastMessageSequenceNumberMap, getClass().getClassLoader());
        this.numMessagesReceived = ExternalizableHelper.readLong(dataInput);
        this.numMessagesDelivered = ExternalizableHelper.readLong(dataInput);
        this.maxSubscribers = ExternalizableHelper.readInt(dataInput);
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public void writeExternal(DataOutput dataOutput) throws IOException {
        super.writeExternal(dataOutput);
        ExternalizableHelper.writeCollection(dataOutput, this.messagesToDeliver);
        ExternalizableHelper.writeCollection(dataOutput, this.messagesToRedeliver);
        ExternalizableHelper.writeCollection(dataOutput, this.waitingSubscriptions);
        ExternalizableHelper.writeCollection(dataOutput, this.waitingSubscriptionsDuplicate);
        ExternalizableHelper.writeMap(dataOutput, this.lastMessageSequenceNumberMap);
        ExternalizableHelper.writeLong(dataOutput, this.numMessagesReceived);
        ExternalizableHelper.writeLong(dataOutput, this.numMessagesDelivered);
        ExternalizableHelper.writeInt(dataOutput, this.maxSubscribers);
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public void readExternal(PofReader pofReader) throws IOException {
        super.readExternal(pofReader);
        this.messagesToDeliver = new LinkedList<>();
        pofReader.readCollection(100, this.messagesToDeliver);
        this.messagesToRedeliver = new LinkedList<>();
        pofReader.readCollection(101, this.messagesToRedeliver);
        this.waitingSubscriptions = new LinkedList<>();
        pofReader.readCollection(102, this.waitingSubscriptions);
        this.waitingSubscriptionsDuplicate = new LinkedList<>();
        pofReader.readCollection(103, this.waitingSubscriptionsDuplicate);
        this.lastMessageSequenceNumberMap = new HashMap<>();
        pofReader.readMap(104, this.lastMessageSequenceNumberMap);
        this.numMessagesReceived = pofReader.readLong(105);
        this.numMessagesDelivered = pofReader.readLong(106);
        this.maxSubscribers = pofReader.readInt(107);
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public void writeExternal(PofWriter pofWriter) throws IOException {
        super.writeExternal(pofWriter);
        pofWriter.writeCollection(100, this.messagesToDeliver);
        pofWriter.writeCollection(101, this.messagesToRedeliver);
        pofWriter.writeCollection(102, this.waitingSubscriptions);
        pofWriter.writeCollection(103, this.waitingSubscriptionsDuplicate);
        pofWriter.writeMap(104, this.lastMessageSequenceNumberMap);
        pofWriter.writeLong(105, this.numMessagesReceived);
        pofWriter.writeLong(106, this.numMessagesDelivered);
        pofWriter.writeInt(107, this.maxSubscribers);
    }

    @Override // com.oracle.coherence.patterns.messaging.Destination
    public String toString() {
        return String.format("Queue{%s, messagesToDeliver=%s, maxSubscribers=%d, messagesToRedeliver=%s,  waitingSubscriptions=%s}", super.toString(), this.messagesToDeliver, Integer.valueOf(this.maxSubscribers), this.messagesToRedeliver, this.waitingSubscriptions);
    }
}
