package org.apache.qpid.server.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/consumer/AbstractConsumerTarget.class */
public abstract class AbstractConsumerTarget implements ConsumerTarget {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
    private final AtomicReference<ConsumerTarget.State> _state;
    private final Set<StateChangeListener<ConsumerTarget, ConsumerTarget.State>> _stateChangeListeners = new CopyOnWriteArraySet();
    private final Lock _stateChangeLock = new ReentrantLock();
    private final AtomicInteger _stateActivates = new AtomicInteger();
    private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue<>();

    protected AbstractConsumerTarget(ConsumerTarget.State state) {
        this._state = new AtomicReference<>(state);
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public boolean processPending() {
        if (!getSessionModel().getAMQPConnection().isIOThread()) {
            return false;
        }
        if (hasMessagesToSend()) {
            sendNextMessage();
            return true;
        }
        processStateChanged();
        processClosed();
        return false;
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public boolean hasPendingWork() {
        return hasMessagesToSend() || hasStateChanged() || hasClosed();
    }

    protected abstract boolean hasStateChanged();

    protected abstract boolean hasClosed();

    protected abstract void processStateChanged();

    protected abstract void processClosed();

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final boolean isSuspended() {
        return getSessionModel().getAMQPConnection().isMessageAssignmentSuspended() || isFlowSuspended();
    }

    protected abstract boolean isFlowSuspended();

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final ConsumerTarget.State getState() {
        return this._state.get();
    }

    protected final boolean updateState(ConsumerTarget.State state, ConsumerTarget.State state2) {
        if (!this._state.compareAndSet(state, state2)) {
            return false;
        }
        if (state2 != ConsumerTarget.State.ACTIVE || this._stateChangeListeners.size() <= 1) {
            Iterator<StateChangeListener<ConsumerTarget, ConsumerTarget.State>> it = this._stateChangeListeners.iterator();
            while (it.hasNext()) {
                it.next().stateChanged(this, state, state2);
            }
            return true;
        }
        int incrementAndGet = this._stateActivates.incrementAndGet();
        if (incrementAndGet >= this._stateChangeListeners.size()) {
            this._stateActivates.set(0);
            incrementAndGet = 0;
        }
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (StateChangeListener<ConsumerTarget, ConsumerTarget.State> stateChangeListener : this._stateChangeListeners) {
            int i2 = i;
            i++;
            if (i2 < incrementAndGet) {
                arrayList.add(stateChangeListener);
            } else {
                stateChangeListener.stateChanged(this, state, state2);
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((StateChangeListener) it2.next()).stateChanged(this, state, state2);
        }
        return true;
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final void notifyCurrentState() {
        for (StateChangeListener<ConsumerTarget, ConsumerTarget.State> stateChangeListener : this._stateChangeListeners) {
            ConsumerTarget.State state = getState();
            stateChangeListener.stateChanged(this, state, state);
        }
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final void addStateListener(StateChangeListener<ConsumerTarget, ConsumerTarget.State> stateChangeListener) {
        this._stateChangeListeners.add(stateChangeListener);
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void removeStateChangeListener(StateChangeListener<ConsumerTarget, ConsumerTarget.State> stateChangeListener) {
        this._stateChangeListeners.remove(stateChangeListener);
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final boolean trySendLock() {
        return this._stateChangeLock.tryLock();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final void getSendLock() {
        this._stateChangeLock.lock();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final void releaseSendLock() {
        this._stateChangeLock.unlock();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final long send(ConsumerImpl consumerImpl, MessageInstance messageInstance, boolean z) {
        AMQPConnection<?> aMQPConnection = getSessionModel().getAMQPConnection();
        aMQPConnection.reserveOutboundMessageSpace(messageInstance.getMessage().getSize());
        this._queue.add(new ConsumerMessageInstancePair(consumerImpl, messageInstance, z));
        aMQPConnection.notifyWork();
        return messageInstance.getMessage().getSize();
    }

    protected abstract void doSend(ConsumerImpl consumerImpl, MessageInstance messageInstance, boolean z);

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public boolean hasMessagesToSend() {
        return !this._queue.isEmpty();
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public void sendNextMessage() {
        ConsumerMessageInstancePair poll = this._queue.poll();
        if (poll != null) {
            try {
                ConsumerImpl consumer = poll.getConsumer();
                MessageInstance entry = poll.getEntry();
                doSend(consumer, entry, poll.isBatch());
                if (consumer.acquires()) {
                    entry.unlockAcquisition();
                }
            } finally {
                poll.release();
            }
        }
    }

    @Override // org.apache.qpid.server.consumer.ConsumerTarget
    public final boolean close() {
        boolean z = false;
        ConsumerTarget.State state = getState();
        getSendLock();
        while (!z) {
            try {
                if (state == ConsumerTarget.State.CLOSED) {
                    break;
                }
                z = updateState(state, ConsumerTarget.State.CLOSED);
                if (!z) {
                    state = getState();
                }
            } catch (Throwable th) {
                releaseSendLock();
                throw th;
            }
        }
        while (true) {
            ConsumerMessageInstancePair poll = this._queue.poll();
            if (poll == null) {
                doCloseInternal();
                releaseSendLock();
                afterCloseInternal();
                return z;
            }
            poll.getEntry().release(poll.getConsumer());
            poll.release();
        }
    }

    protected abstract void afterCloseInternal();

    protected abstract void doCloseInternal();
}
