package net.timewalker.ffmq3.remote.session;

import java.util.LinkedList;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import net.timewalker.ffmq3.FFMQCoreSettings;
import net.timewalker.ffmq3.client.ClientEnvironment;
import net.timewalker.ffmq3.common.message.AbstractMessage;
import net.timewalker.ffmq3.common.session.AbstractMessageConsumer;
import net.timewalker.ffmq3.transport.PacketTransportEndpoint;
import net.timewalker.ffmq3.transport.packet.query.CloseConsumerQuery;
import net.timewalker.ffmq3.transport.packet.query.CreateConsumerQuery;
import net.timewalker.ffmq3.transport.packet.query.PrefetchQuery;
import net.timewalker.ffmq3.utils.ErrorTools;
import net.timewalker.ffmq3.utils.Settings;
import net.timewalker.ffmq3.utils.async.AsyncTask;
import net.timewalker.ffmq3.utils.async.AsyncTaskManager;
import net.timewalker.ffmq3.utils.concurrent.Semaphore;
import net.timewalker.ffmq3.utils.id.IntegerID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:net/timewalker/ffmq3/remote/session/RemoteMessageConsumer.class */
public class RemoteMessageConsumer extends AbstractMessageConsumer {
    private static final Log log;
    protected PacketTransportEndpoint transportEndpoint;
    private boolean traceEnabled;
    private LinkedList prefetchQueue;
    private Semaphore prefetchSemaphore;
    protected boolean donePrefetching;
    private AsyncTaskManager asyncTaskManager;
    private boolean logListenersFailures;
    private final WakeUpTask wakeUpTask;
    static Class class$net$timewalker$ffmq3$remote$session$RemoteMessageConsumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/timewalker/ffmq3/remote/session/RemoteMessageConsumer$WakeUpTask.class */
    public final class WakeUpTask implements AsyncTask {
        private final RemoteMessageConsumer this$0;

        public WakeUpTask(RemoteMessageConsumer remoteMessageConsumer) {
            this.this$0 = remoteMessageConsumer;
        }

        @Override // net.timewalker.ffmq3.utils.async.AsyncTask
        public final boolean isMergeable() {
            return true;
        }

        @Override // net.timewalker.ffmq3.utils.async.AsyncTask
        public final void execute() {
            this.this$0.wakeUp();
        }
    }

    public RemoteMessageConsumer(IntegerID integerID, RemoteSession remoteSession, Destination destination, String str, boolean z) throws JMSException {
        super(remoteSession, destination, str, z, integerID);
        this.prefetchQueue = new LinkedList();
        this.prefetchSemaphore = new Semaphore();
        this.donePrefetching = false;
        this.wakeUpTask = new WakeUpTask(this);
        this.transportEndpoint = remoteSession.getTransportEndpoint();
        this.asyncTaskManager = ClientEnvironment.getAsyncTaskManager();
        this.traceEnabled = log.isTraceEnabled();
        this.logListenersFailures = getSettings().getBooleanProperty(FFMQCoreSettings.DELIVERY_LOG_LISTENERS_FAILURES, false);
        log.debug(new StringBuffer().append("New remote consumer ID is ").append(integerID).toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void remoteInit() throws JMSException {
        CreateConsumerQuery createConsumerQuery = new CreateConsumerQuery();
        createConsumerQuery.setConsumerId(this.id);
        createConsumerQuery.setSessionId(this.session.getId());
        createConsumerQuery.setDestination(this.destination);
        createConsumerQuery.setMessageSelector(this.messageSelector);
        createConsumerQuery.setNoLocal(this.noLocal);
        this.transportEndpoint.blockingRequest(createConsumerQuery);
    }

    @Override // net.timewalker.ffmq3.common.session.AbstractMessageConsumer
    protected final boolean shouldLogListenersFailures() {
        return this.logListenersFailures;
    }

    private final Settings getSettings() {
        return ClientEnvironment.getSettings();
    }

    @Override // net.timewalker.ffmq3.common.session.AbstractMessageConsumer
    public final void setMessageListener(MessageListener messageListener) throws JMSException {
        super.setMessageListener(messageListener);
        if (messageListener == null || !this.session.getConnection().isStarted()) {
            return;
        }
        wakeUpMessageListenerAsync();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.timewalker.ffmq3.common.session.AbstractMessageConsumer
    public final void onConsumerClose() {
        super.onConsumerClose();
        this.prefetchSemaphore.release();
        try {
            CloseConsumerQuery closeConsumerQuery = new CloseConsumerQuery();
            closeConsumerQuery.setSessionId(this.session.getId());
            closeConsumerQuery.setConsumerId(this.id);
            synchronized (this.prefetchQueue) {
                while (!this.prefetchQueue.isEmpty()) {
                    closeConsumerQuery.addUndeliveredMessageID(((Message) this.prefetchQueue.removeFirst()).getJMSMessageID());
                }
            }
            this.transportEndpoint.blockingRequest(closeConsumerQuery);
        } catch (JMSException e) {
            ErrorTools.log(e, log);
        }
    }

    public final boolean addToPrefetchQueue(AbstractMessage abstractMessage, boolean z) {
        this.externalAccessLock.readLock().lock();
        try {
            if (this.closed) {
                return false;
            }
            synchronized (this.prefetchQueue) {
                if (this.traceEnabled) {
                    log.trace(new StringBuffer().append("#").append(this.id).append(" [PREFETCHED] from ").append(this.destination).append(" - ").append(abstractMessage).toString());
                }
                this.prefetchQueue.add(abstractMessage);
                this.donePrefetching = z;
            }
            this.externalAccessLock.readLock().unlock();
            this.prefetchSemaphore.release();
            if (this.messageListener == null) {
                return true;
            }
            wakeUpMessageListenerAsync();
            return true;
        } finally {
            this.externalAccessLock.readLock().unlock();
        }
    }

    private void wakeUpMessageListenerAsync() {
        try {
            this.asyncTaskManager.execute(this.wakeUpTask);
        } catch (JMSException e) {
            ErrorTools.log(e, log);
        }
    }

    private AbstractMessage getFromPrefetchQueue(long j) {
        AbstractMessage abstractMessage;
        boolean z = false;
        synchronized (this.prefetchQueue) {
            if (this.donePrefetching && this.prefetchQueue.isEmpty()) {
                z = true;
                this.donePrefetching = false;
            }
        }
        if (z) {
            try {
                prefetchFromDestination();
            } catch (JMSException e) {
                log.error("Cannot prefetch more messages from remote server", e);
            }
        }
        if (!this.prefetchSemaphore.tryAcquire(j)) {
            return null;
        }
        this.externalAccessLock.readLock().lock();
        try {
            if (this.closed) {
                return null;
            }
            synchronized (this.prefetchQueue) {
                if (this.prefetchQueue.isEmpty()) {
                    throw new IllegalStateException("Prefetch queue is empty");
                }
                abstractMessage = (AbstractMessage) this.prefetchQueue.removeFirst();
            }
            this.externalAccessLock.readLock().unlock();
            ((RemoteSession) this.session).notifyDeliveredMessage(abstractMessage.getJMSMessageID());
            if (this.traceEnabled) {
                log.trace(new StringBuffer().append("#").append(this.id).append(" [GET PREFETCHED] in ").append(this.destination).append(" - ").append(abstractMessage).toString());
            }
            abstractMessage.ensureDeserializationLevel(3);
            abstractMessage.markAsReadOnly();
            return abstractMessage;
        } finally {
            this.externalAccessLock.readLock().unlock();
        }
    }

    private void prefetchFromDestination() throws JMSException {
        if (this.closed) {
            return;
        }
        if (this.traceEnabled) {
            log.trace(new StringBuffer().append("#").append(this.id).append(" Prefetching more from destination ").append(this.destination).toString());
        }
        PrefetchQuery prefetchQuery = new PrefetchQuery();
        prefetchQuery.setSessionId(this.session.getId());
        prefetchQuery.setConsumerId(this.id);
        this.transportEndpoint.nonBlockingRequest(prefetchQuery);
    }

    @Override // net.timewalker.ffmq3.common.session.AbstractMessageConsumer
    protected final AbstractMessage receiveFromDestination(long j, boolean z) throws JMSException {
        if (this.closed) {
            return null;
        }
        return getFromPrefetchQueue(j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.timewalker.ffmq3.common.session.AbstractMessageConsumer
    public final void wakeUp() {
        if (!this.closed && this.session.getConnection().isStarted()) {
            if (this.messageListener == null) {
                throw new IllegalStateException("Unexpected message availability notification");
            }
            wakeUpMessageListener();
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$net$timewalker$ffmq3$remote$session$RemoteMessageConsumer == null) {
            cls = class$("net.timewalker.ffmq3.remote.session.RemoteMessageConsumer");
            class$net$timewalker$ffmq3$remote$session$RemoteMessageConsumer = cls;
        } else {
            cls = class$net$timewalker$ffmq3$remote$session$RemoteMessageConsumer;
        }
        log = LogFactory.getLog(cls);
    }
}
