package com.oracle.coherence.patterns.messaging;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.patterns.messaging.LeasedSubscription;
import com.oracle.coherence.patterns.messaging.entryprocessors.UnsubscribeProcessor;
import com.oracle.coherence.patterns.messaging.exceptions.SubscriberInterruptedException;
import com.oracle.coherence.patterns.messaging.exceptions.SubscriptionLostException;
import com.tangosol.net.CacheFactory;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.processor.UpdaterProcessor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/messaging/AbstractSubscriber.class */
abstract class AbstractSubscriber<S extends LeasedSubscription> implements Subscriber {
    private static Logger logger = Logger.getLogger(AbstractSubscriber.class.getName());
    private MessagingSession messagingSession;
    private SubscriptionIdentifier subscriptionIdentifier;
    private LeaseMaintainerThread leaseMaintainerThread;
    private ConcurrentHashMap<Identifier, Long> prevMessageIdentifierMap = new ConcurrentHashMap<>();
    private LinkedBlockingQueue<S> subscriptionUpdates = new LinkedBlockingQueue<>();
    private boolean autocommit = true;
    private boolean subscriptionOffered = false;
    protected State state = State.Starting;
    private MapListener mapListener = new MapListener() { // from class: com.oracle.coherence.patterns.messaging.AbstractSubscriber.1
        public void entryInserted(MapEvent mapEvent) {
            if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                AbstractSubscriber.logger.log(Level.FINER, "{0} received insert event {1}", new Object[]{this, mapEvent});
            }
            AbstractSubscriber.this.subscriptionOffered = true;
            AbstractSubscriber.this.subscriptionUpdates.offer((LeasedSubscription) mapEvent.getNewValue());
        }

        public void entryUpdated(MapEvent mapEvent) {
            if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                AbstractSubscriber.logger.log(Level.FINER, "{0} received update event {1}", new Object[]{this, mapEvent});
            }
            LeasedSubscription leasedSubscription = (LeasedSubscription) mapEvent.getOldValue();
            LeasedSubscription leasedSubscription2 = (LeasedSubscription) mapEvent.getNewValue();
            if (leasedSubscription.getNumMessages() < leasedSubscription2.getNumMessages() || (leasedSubscription.getLease().isSuspended() && !leasedSubscription2.getLease().isSuspended())) {
                AbstractSubscriber.this.subscriptionOffered = true;
                AbstractSubscriber.this.subscriptionUpdates.offer((LeasedSubscription) mapEvent.getNewValue());
            } else {
                if (AbstractSubscriber.this.getState() != State.Starting || AbstractSubscriber.this.subscriptionOffered) {
                    return;
                }
                AbstractSubscriber.this.subscriptionOffered = true;
                AbstractSubscriber.this.subscriptionUpdates.offer((LeasedSubscription) mapEvent.getNewValue());
            }
        }

        public void entryDeleted(MapEvent mapEvent) {
            if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                AbstractSubscriber.logger.log(Level.FINER, "{0} received delete event {1}", new Object[]{this, mapEvent});
            }
            AbstractSubscriber.this.shutdown(State.Removed);
            AbstractSubscriber.this.subscriptionUpdates.offer((LeasedSubscription) mapEvent.getOldValue());
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/oracle/coherence/patterns/messaging/AbstractSubscriber$LeaseMaintainerThread.class */
    public static class LeaseMaintainerThread extends Thread {
        private static final long LEASE_REFRESH_TIME_MS = 2000;
        private Subscriber subscriber;
        private boolean isTerminated = false;

        public LeaseMaintainerThread(Subscriber subscriber) {
            this.subscriber = subscriber;
        }

        public synchronized void terminate() {
            this.isTerminated = true;
        }

        public synchronized boolean isTerminated() {
            return this.isTerminated;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                AbstractSubscriber.logger.log(Level.FINER, "LeaseMaintainerThread for {0} commenced", this.subscriber.getSubscriptionIdentifier());
            }
            while (!isTerminated()) {
                try {
                    if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                        AbstractSubscriber.logger.log(Level.FINER, "LeaseMaintainerThread for {0} sleeping for {1} ms", new Object[]{this.subscriber.getSubscriptionIdentifier(), Long.valueOf(LEASE_REFRESH_TIME_MS)});
                    }
                    Thread.sleep(LEASE_REFRESH_TIME_MS);
                } catch (InterruptedException e) {
                    if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                        AbstractSubscriber.logger.log(Level.FINER, "LeaseMaintainerThread for {0} interrupted", this.subscriber.getSubscriptionIdentifier());
                    }
                }
                if (isTerminated()) {
                    break;
                }
                CacheFactory.getCache(Subscription.CACHENAME).invoke(this.subscriber.getSubscriptionIdentifier(), new UpdaterProcessor("extendLease", 0L));
                if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                    AbstractSubscriber.logger.log(Level.FINER, "LeaseMaintainerThread for {0} extending current lease", this.subscriber.getSubscriptionIdentifier());
                    AbstractSubscriber.logger.log(Level.FINER, "LeaseMaintainerThread for {0} has extended the lease", this.subscriber.getSubscriptionIdentifier());
                }
            }
            if (AbstractSubscriber.logger.isLoggable(Level.FINER)) {
                AbstractSubscriber.logger.log(Level.FINER, "LeaseMaintainerThread for {0} completed", this.subscriber.getSubscriptionIdentifier());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/oracle/coherence/patterns/messaging/AbstractSubscriber$State.class */
    public enum State {
        Starting,
        Active,
        Interrupted,
        Removed,
        Shutdown
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriber(MessagingSession messagingSession, SubscriptionIdentifier subscriptionIdentifier) {
        this.messagingSession = messagingSession;
        this.subscriptionIdentifier = subscriptionIdentifier;
        CacheFactory.getCache(Subscription.CACHENAME).addMapListener(this.mapListener, subscriptionIdentifier, false);
        this.leaseMaintainerThread = new LeaseMaintainerThread(this);
        this.leaseMaintainerThread.setDaemon(true);
        this.leaseMaintainerThread.setName(String.format("LeaseMaintainerThread[%s]", subscriptionIdentifier.getSubscriberIdentifier()));
        this.leaseMaintainerThread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean verifyMessageSequence(Message message) {
        long messageSequenceNumber = message.getRequestIdentifier().getMessageSequenceNumber();
        Identifier publisherIdentifier = message.getRequestIdentifier().getPublisherIdentifier();
        Long l = this.prevMessageIdentifierMap.get(publisherIdentifier);
        if (l == null) {
            this.prevMessageIdentifierMap.put(publisherIdentifier, Long.valueOf(messageSequenceNumber));
            return true;
        }
        this.prevMessageIdentifierMap.put(publisherIdentifier, Long.valueOf(messageSequenceNumber));
        if (l.longValue() <= messageSequenceNumber || !logger.isLoggable(Level.WARNING)) {
            return true;
        }
        logger.log(Level.WARNING, "Message sequence number out of order probably due to a rollback.  Prev = {0} New = {1}", new Object[]{l, Long.valueOf(messageSequenceNumber)});
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resetMessageSequenceVerification() {
        this.prevMessageIdentifierMap = new ConcurrentHashMap<>();
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public MessagingSession getMessagingSession() {
        return this.messagingSession;
    }

    protected synchronized State getState() {
        return this.state;
    }

    protected synchronized void setState(State state) {
        if (this.state == State.Removed || this.state == State.Interrupted || this.state == State.Shutdown) {
            return;
        }
        this.state = state;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public S getNextSubscriptionUpdate() {
        try {
            S take = this.subscriptionUpdates.take();
            setState(State.Active);
            return take;
        } catch (InterruptedException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Subscriber was interrupted while waiting for its subscription state to arrive {0}.\n", (Throwable) e);
            }
            shutdown(State.Interrupted);
            throw new SubscriberInterruptedException(getSubscriptionIdentifier(), "Subscriber was interrupted while waiting for subscription state to arrive", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureSubscription() {
        State state = getState();
        if (state == State.Starting) {
            getNextSubscriptionUpdate();
        } else {
            if (state == State.Removed) {
                throw new SubscriptionLostException(getSubscriptionIdentifier(), "Subscriber subscription was lost (with expired or the underlying destination has been removed)");
            }
            if (state == State.Shutdown) {
                throw new SubscriptionLostException(getSubscriptionIdentifier(), "Attempting to use a Subscription that has been previously released or shutdown");
            }
            if (state == State.Interrupted) {
                throw new SubscriptionLostException(getSubscriptionIdentifier(), "Attempting to use a Subscription that was previously interrupted and shutdown");
            }
        }
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public SubscriptionIdentifier getSubscriptionIdentifier() {
        return this.subscriptionIdentifier;
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public Identifier getDestinationIdentifier() {
        return this.subscriptionIdentifier.getDestinationIdentifier();
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public boolean isActive() {
        return getState() == State.Starting || getState() == State.Active;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureActive() {
        if (!isActive()) {
            throw new SubscriptionLostException(getSubscriptionIdentifier(), "Attempted to use an inactive Subscriber");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void shutdown(State state) {
        if (isActive()) {
            CacheFactory.getCache(Subscription.CACHENAME).removeMapListener(this.mapListener, this.subscriptionIdentifier);
            this.mapListener = null;
            setState(state);
            if (this.leaseMaintainerThread == null || !this.leaseMaintainerThread.isAlive()) {
                return;
            }
            this.leaseMaintainerThread.terminate();
        }
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public void unsubscribe() {
        if (isActive()) {
            shutdown(State.Shutdown);
            CacheFactory.getCache(Destination.CACHENAME).invoke(getDestinationIdentifier(), new UnsubscribeProcessor(getSubscriptionIdentifier()));
        }
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public boolean isAutoCommitting() {
        return this.autocommit;
    }

    @Override // com.oracle.coherence.patterns.messaging.Subscriber
    public void setAutoCommit(boolean z) {
        if (z && !this.autocommit) {
            rollback();
        }
        this.autocommit = z;
    }
}
