package org.apache.activemq.store.journal;

import java.io.IOException;
import java.util.HashMap;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:lib/activemq-core-4.1.1.jar:org/apache/activemq/store/journal/QuickJournalTopicMessageStore.class */
public class QuickJournalTopicMessageStore extends QuickJournalMessageStore implements TopicMessageStore {
    private static final Log log;
    private TopicMessageStore longTermStore;
    private HashMap ackedLastAckLocations;
    static Class class$org$apache$activemq$store$journal$QuickJournalTopicMessageStore;

    public QuickJournalTopicMessageStore(QuickJournalPersistenceAdapter quickJournalPersistenceAdapter, TopicMessageStore topicMessageStore, ActiveMQTopic activeMQTopic) {
        super(quickJournalPersistenceAdapter, topicMessageStore, activeMQTopic);
        this.ackedLastAckLocations = new HashMap();
        this.longTermStore = topicMessageStore;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recoverSubscription(str, str2, new MessageRecoveryListener(this, messageRecoveryListener) { // from class: org.apache.activemq.store.journal.QuickJournalTopicMessageStore.1
            private final MessageRecoveryListener val$listener;
            private final QuickJournalTopicMessageStore this$0;

            {
                this.this$0 = this;
                this.val$listener = messageRecoveryListener;
            }

            @Override // org.apache.activemq.store.MessageRecoveryListener
            public void recoverMessage(Message message) throws Exception {
                throw new IOException("Should not get called.");
            }

            @Override // org.apache.activemq.store.MessageRecoveryListener
            public void recoverMessageReference(String str3) throws Exception {
                this.val$listener.recoverMessage((Message) this.this$0.peristenceAdapter.readCommand(QuickJournalMessageStore.toRecordLocation(str3)));
            }

            @Override // org.apache.activemq.store.MessageRecoveryListener
            public void finished() {
                this.val$listener.finished();
            }
        });
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverNextMessages(String str, String str2, MessageId messageId, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recoverNextMessages(str, str2, messageId, i, new MessageRecoveryListener(this, messageRecoveryListener) { // from class: org.apache.activemq.store.journal.QuickJournalTopicMessageStore.2
            private final MessageRecoveryListener val$listener;
            private final QuickJournalTopicMessageStore this$0;

            {
                this.this$0 = this;
                this.val$listener = messageRecoveryListener;
            }

            @Override // org.apache.activemq.store.MessageRecoveryListener
            public void recoverMessage(Message message) throws Exception {
                throw new IOException("Should not get called.");
            }

            @Override // org.apache.activemq.store.MessageRecoveryListener
            public void recoverMessageReference(String str3) throws Exception {
                this.val$listener.recoverMessage((Message) this.this$0.peristenceAdapter.readCommand(QuickJournalMessageStore.toRecordLocation(str3)));
            }

            @Override // org.apache.activemq.store.MessageRecoveryListener
            public void finished() {
                this.val$listener.finished();
            }
        });
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return this.longTermStore.lookupSubscription(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void addSubsciption(String str, String str2, String str3, boolean z) throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.addSubsciption(str, str2, str3, z);
    }

    @Override // org.apache.activemq.store.journal.QuickJournalMessageStore, org.apache.activemq.store.MessageStore
    public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        super.addMessage(connectionContext, message);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId) throws IOException {
        boolean isDebugEnabled = log.isDebugEnabled();
        JournalTopicAck journalTopicAck = new JournalTopicAck();
        journalTopicAck.setDestination(this.destination);
        journalTopicAck.setMessageId(messageId);
        journalTopicAck.setMessageSequenceId(messageId.getBrokerSequenceId());
        journalTopicAck.setSubscritionName(str2);
        journalTopicAck.setClientId(str);
        journalTopicAck.setTransactionId(connectionContext.getTransaction() != null ? connectionContext.getTransaction().getTransactionId() : null);
        RecordLocation writeCommand = this.peristenceAdapter.writeCommand(journalTopicAck, false);
        SubscriptionKey subscriptionKey = new SubscriptionKey(str, str2);
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled acknowledge for: ").append(messageId).append(", at: ").append(writeCommand).toString());
            }
            acknowledge(messageId, writeCommand, subscriptionKey);
        } else {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled transacted acknowledge for: ").append(messageId).append(", at: ").append(writeCommand).toString());
            }
            synchronized (this) {
                this.inFlightTxLocations.add(writeCommand);
            }
            this.transactionStore.acknowledge(this, journalTopicAck, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization(this, isDebugEnabled, messageId, writeCommand, subscriptionKey) { // from class: org.apache.activemq.store.journal.QuickJournalTopicMessageStore.3
                private final boolean val$debug;
                private final MessageId val$messageId;
                private final RecordLocation val$location;
                private final SubscriptionKey val$key;
                private final QuickJournalTopicMessageStore this$0;

                {
                    this.this$0 = this;
                    this.val$debug = isDebugEnabled;
                    this.val$messageId = messageId;
                    this.val$location = writeCommand;
                    this.val$key = subscriptionKey;
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (this.val$debug) {
                        QuickJournalTopicMessageStore.log.debug(new StringBuffer().append("Transacted acknowledge commit for: ").append(this.val$messageId).append(", at: ").append(this.val$location).toString());
                    }
                    synchronized (this.this$0) {
                        this.this$0.inFlightTxLocations.remove(this.val$location);
                        this.this$0.acknowledge(this.val$messageId, this.val$location, this.val$key);
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (this.val$debug) {
                        QuickJournalTopicMessageStore.log.debug(new StringBuffer().append("Transacted acknowledge rollback for: ").append(this.val$messageId).append(", at: ").append(this.val$location).toString());
                    }
                    synchronized (this.this$0) {
                        this.this$0.inFlightTxLocations.remove(this.val$location);
                    }
                }
            });
        }
    }

    public void replayAcknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId) {
        try {
            if (this.longTermStore.lookupSubscription(str, str2) != null) {
                this.longTermStore.acknowledge(connectionContext, str, str2, messageId);
            }
        } catch (Throwable th) {
            log.debug(new StringBuffer().append("Could not replay acknowledge for message '").append(messageId).append("'.  Message may have already been acknowledged. reason: ").append(th).toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void acknowledge(MessageId messageId, RecordLocation recordLocation, SubscriptionKey subscriptionKey) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            this.ackedLastAckLocations.put(subscriptionKey, messageId);
        }
    }

    @Override // org.apache.activemq.store.journal.QuickJournalMessageStore
    public RecordLocation checkpoint() throws IOException {
        HashMap hashMap;
        synchronized (this) {
            hashMap = this.ackedLastAckLocations;
            this.ackedLastAckLocations = new HashMap();
        }
        return super.checkpoint(new Callback(this, hashMap) { // from class: org.apache.activemq.store.journal.QuickJournalTopicMessageStore.4
            private final HashMap val$cpAckedLastAckLocations;
            private final QuickJournalTopicMessageStore this$0;

            {
                this.this$0 = this;
                this.val$cpAckedLastAckLocations = hashMap;
            }

            @Override // org.apache.activemq.util.Callback
            public void execute() throws Exception {
                for (SubscriptionKey subscriptionKey : this.val$cpAckedLastAckLocations.keySet()) {
                    this.this$0.longTermStore.acknowledge(this.this$0.transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, (MessageId) this.val$cpAckedLastAckLocations.get(subscriptionKey));
                }
            }
        });
    }

    public TopicMessageStore getLongTermTopicMessageStore() {
        return this.longTermStore;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void deleteSubscription(String str, String str2) throws IOException {
        this.longTermStore.deleteSubscription(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return this.longTermStore.getAllSubscriptions();
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public MessageId getNextMessageIdToDeliver(String str, String str2, MessageId messageId) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        return this.longTermStore.getNextMessageIdToDeliver(str, str2, messageId);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public MessageId getPreviousMessageIdToDeliver(String str, String str2, MessageId messageId) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        return this.longTermStore.getPreviousMessageIdToDeliver(str, str2, messageId);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public int getMessageCount(String str, String str2) throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        return this.longTermStore.getMessageCount(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void resetBatching(String str, String str2, MessageId messageId) {
        this.longTermStore.resetBatching(str, str2, messageId);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$store$journal$QuickJournalTopicMessageStore == null) {
            cls = class$("org.apache.activemq.store.journal.QuickJournalTopicMessageStore");
            class$org$apache$activemq$store$journal$QuickJournalTopicMessageStore = cls;
        } else {
            cls = class$org$apache$activemq$store$journal$QuickJournalTopicMessageStore;
        }
        log = LogFactory.getLog(cls);
    }
}
