package org.apache.activemq.broker.region.cursors;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-4.1.1.jar:org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.class */
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
    private static final Log log;
    private String clientId;
    private String subscriberName;
    private boolean started;
    private PendingMessageCursor nonPersistent;
    private PendingMessageCursor currentCursor;
    static Class class$org$apache$activemq$broker$region$cursors$StoreDurableSubscriberCursor;
    private int pendingCount = 0;
    private Map topics = new HashMap();
    private LinkedList storePrefetches = new LinkedList();

    public StoreDurableSubscriberCursor(String str, String str2, Store store, int i) {
        this.clientId = str;
        this.subscriberName = str2;
        this.nonPersistent = new FilePendingMessageCursor(new StringBuffer().append(str).append(str2).toString(), store);
        this.storePrefetches.add(this.nonPersistent);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public synchronized void start() throws Exception {
        this.started = true;
        Iterator it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            PendingMessageCursor pendingMessageCursor = (PendingMessageCursor) it.next();
            pendingMessageCursor.start();
            this.pendingCount += pendingMessageCursor.size();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public synchronized void stop() throws Exception {
        this.started = false;
        Iterator it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            ((PendingMessageCursor) it.next()).stop();
        }
        this.pendingCount = 0;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void add(ConnectionContext connectionContext, Destination destination) throws Exception {
        TopicStorePrefetch topicStorePrefetch = new TopicStorePrefetch((Topic) destination, this.clientId, this.subscriberName);
        topicStorePrefetch.setMaxBatchSize(getMaxBatchSize());
        this.topics.put(destination, topicStorePrefetch);
        this.storePrefetches.add(topicStorePrefetch);
        if (this.started) {
            topicStorePrefetch.start();
            this.pendingCount += topicStorePrefetch.size();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void remove(ConnectionContext connectionContext, Destination destination) throws Exception {
        Object remove = this.topics.remove(destination);
        if (remove != null) {
            this.storePrefetches.remove(remove);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean isEmpty() {
        return this.pendingCount <= 0;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public boolean isRecoveryRequired() {
        return false;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void addMessageLast(MessageReference messageReference) throws Exception {
        if (messageReference != null) {
            Message message = messageReference.getMessage();
            if (this.started) {
                this.pendingCount++;
                if (!message.isPersistent()) {
                    this.nonPersistent.addMessageLast(messageReference);
                }
            }
            if (message.isPersistent()) {
                TopicStorePrefetch topicStorePrefetch = (TopicStorePrefetch) this.topics.get(message.getRegionDestination());
                if (topicStorePrefetch != null) {
                    topicStorePrefetch.addMessageLast(messageReference);
                    if (!this.started || this.pendingCount - this.nonPersistent.size() > 0) {
                        return;
                    }
                    topicStorePrefetch.nextToDispatch(messageReference.getMessageId());
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void clear() {
        this.pendingCount = 0;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean hasNext() {
        boolean z = this.pendingCount > 0;
        if (z) {
            try {
                this.currentCursor = getNextCursor();
                z = this.currentCursor != null ? this.currentCursor.hasNext() : false;
            } catch (Exception e) {
                log.error("Failed to get current cursor ", e);
                throw new RuntimeException(e);
            }
        }
        return z;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized MessageReference next() {
        if (this.currentCursor != null) {
            return this.currentCursor.next();
        }
        return null;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void remove() {
        if (this.currentCursor != null) {
            this.currentCursor.remove();
        }
        this.pendingCount--;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void reset() {
        Iterator it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            ((AbstractPendingMessageCursor) it.next()).reset();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public int size() {
        return this.pendingCount;
    }

    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (this.currentCursor == null || this.currentCursor.isEmpty()) {
            this.currentCursor = null;
            Iterator it = this.storePrefetches.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                AbstractPendingMessageCursor abstractPendingMessageCursor = (AbstractPendingMessageCursor) it.next();
                abstractPendingMessageCursor.setMaxBatchSize(getMaxBatchSize());
                if (abstractPendingMessageCursor.hasNext()) {
                    this.currentCursor = abstractPendingMessageCursor;
                    break;
                }
            }
            this.storePrefetches.addLast(this.storePrefetches.removeFirst());
        }
        return this.currentCursor;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$broker$region$cursors$StoreDurableSubscriberCursor == null) {
            cls = class$("org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor");
            class$org$apache$activemq$broker$region$cursors$StoreDurableSubscriberCursor = cls;
        } else {
            cls = class$org$apache$activemq$broker$region$cursors$StoreDurableSubscriberCursor;
        }
        log = LogFactory.getLog(cls);
    }
}
