package org.apache.activemq.artemis.protocol.amqp.broker;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.class */
public class AMQPSessionCallback implements SessionCallback {
    private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
    private final AMQPConnectionCallback protonSPI;
    private final ProtonProtocolManager manager;
    private final AMQPConnectionContext connection;
    private final Connection transportConnection;
    private ServerSession serverSession;
    private final OperationContext operationContext;
    private AMQPSessionContext protonSession;
    private final Executor closeExecutor;
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
    private final AtomicBoolean draining = new AtomicBoolean(false);

    public AMQPSessionCallback(AMQPConnectionCallback aMQPConnectionCallback, ProtonProtocolManager protonProtocolManager, AMQPConnectionContext aMQPConnectionContext, Connection connection, Executor executor, OperationContext operationContext) {
        this.protonSPI = aMQPConnectionCallback;
        this.manager = protonProtocolManager;
        this.connection = aMQPConnectionContext;
        this.transportConnection = connection;
        this.closeExecutor = executor;
        this.operationContext = operationContext;
    }

    public boolean isWritable(ReadyListener readyListener, Object obj) {
        return this.transportConnection.isWritable(readyListener) && ((ProtonServerSenderContext) obj).getSender().getLocalState() != EndpointState.CLOSED;
    }

    public void onFlowConsumer(Object obj, int i, boolean z) {
        ServerConsumerImpl serverConsumerImpl = (ServerConsumerImpl) obj;
        if (!z) {
            serverConsumerImpl.receiveCredits(-1);
        } else if (this.draining.compareAndSet(false, true)) {
            final ProtonServerSenderContext protonServerSenderContext = (ProtonServerSenderContext) serverConsumerImpl.getProtocolContext();
            serverConsumerImpl.forceDelivery(1L, new Runnable() { // from class: org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        protonServerSenderContext.getSender().drained();
                    } finally {
                        AMQPSessionCallback.this.draining.set(false);
                    }
                }
            });
        }
    }

    public void browserFinished(ServerConsumer serverConsumer) {
    }

    public void init(AMQPSessionContext aMQPSessionContext, SASLResult sASLResult) throws Exception {
        this.protonSession = aMQPSessionContext;
        String generateStringUUID = UUIDGenerator.getInstance().generateStringUUID();
        String str = null;
        String str2 = null;
        if (sASLResult != null) {
            str = sASLResult.getUser();
            if (sASLResult instanceof PlainSASLResult) {
                str2 = ((PlainSASLResult) sASLResult).getPassword();
            }
        }
        this.serverSession = this.manager.getServer().createSession(generateStringUUID, str, str2, 102400, this.protonSPI.getProtonConnectionDelegate(), false, false, false, true, (String) null, this, true, this.operationContext, this.manager.getPrefixes());
    }

    public void afterDelivery() throws Exception {
    }

    public void start() {
    }

    public Object createSender(ProtonServerSenderContext protonServerSenderContext, String str, String str2, boolean z) throws Exception {
        ServerConsumer createConsumer = this.serverSession.createConsumer(this.consumerIDGenerator.generateID(), SimpleString.toSimpleString(str), SimpleString.toSimpleString(SelectorTranslator.convertToActiveMQFilterString(str2)), z);
        createConsumer.setStarted(true);
        createConsumer.setProtocolContext(protonServerSenderContext);
        return createConsumer;
    }

    public void startSender(Object obj) throws Exception {
        ((ServerConsumer) obj).receiveCredits(-1);
    }

    public void createTemporaryQueue(String str, RoutingType routingType) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString(str), SimpleString.toSimpleString(str), routingType, (SimpleString) null, true, false);
    }

    public void createTemporaryQueue(String str, String str2, RoutingType routingType, String str3) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString(str), SimpleString.toSimpleString(str2), routingType, SimpleString.toSimpleString(str3), true, false);
    }

    public void createUnsharedDurableQueue(String str, RoutingType routingType, String str2, String str3) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString(str), SimpleString.toSimpleString(str2), routingType, SimpleString.toSimpleString(str3), false, true, 1, false, false);
    }

    public void createSharedDurableQueue(String str, RoutingType routingType, String str2, String str3) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString(str), SimpleString.toSimpleString(str2), routingType, SimpleString.toSimpleString(str3), false, true, -1, false, false);
    }

    public void createSharedVolatileQueue(String str, RoutingType routingType, String str2, String str3) throws Exception {
        this.serverSession.createQueue(SimpleString.toSimpleString(str), SimpleString.toSimpleString(str2), routingType, SimpleString.toSimpleString(str3), false, false, -1, true, true);
    }

    public QueueQueryResult queueQuery(String str, RoutingType routingType, boolean z) throws Exception {
        QueueQueryResult executeQueueQuery = this.serverSession.executeQueueQuery(SimpleString.toSimpleString(str));
        if (!executeQueueQuery.isExists() && executeQueueQuery.isAutoCreateQueues() && z) {
            try {
                this.serverSession.createQueue(new SimpleString(str), new SimpleString(str), routingType, (SimpleString) null, false, true);
            } catch (ActiveMQQueueExistsException e) {
            }
            executeQueueQuery = this.serverSession.executeQueueQuery(SimpleString.toSimpleString(str));
        }
        if (executeQueueQuery.getRoutingType() != routingType) {
            throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
        }
        return executeQueueQuery;
    }

    public boolean bindingQuery(String str) throws Exception {
        BindingQueryResult executeBindingQuery = this.serverSession.executeBindingQuery(SimpleString.toSimpleString(str));
        if (!executeBindingQuery.isExists() && executeBindingQuery.isAutoCreateQueues()) {
            try {
                this.serverSession.createQueue(new SimpleString(str), new SimpleString(str), RoutingType.ANYCAST, (SimpleString) null, false, true);
            } catch (ActiveMQQueueExistsException e) {
            }
            executeBindingQuery = this.serverSession.executeBindingQuery(SimpleString.toSimpleString(str));
        }
        return executeBindingQuery.isExists();
    }

    public AddressQueryResult addressQuery(String str, RoutingType routingType, boolean z) throws Exception {
        AddressQueryResult executeAddressQuery = this.serverSession.executeAddressQuery(SimpleString.toSimpleString(str));
        if (!executeAddressQuery.isExists() && executeAddressQuery.isAutoCreateAddresses() && z) {
            try {
                this.serverSession.createAddress(SimpleString.toSimpleString(str), routingType, true);
            } catch (ActiveMQQueueExistsException e) {
            }
            executeAddressQuery = this.serverSession.executeAddressQuery(SimpleString.toSimpleString(str));
        }
        return executeAddressQuery;
    }

    public void closeSender(Object obj) throws Exception {
        final ServerConsumer serverConsumer = (ServerConsumer) obj;
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    serverConsumer.close(false);
                    countDownLatch.countDown();
                } catch (Exception e) {
                }
            }
        };
        Executor exeuctor = this.protonSPI.getExeuctor();
        if (exeuctor != null) {
            exeuctor.execute(runnable);
        } else {
            runnable.run();
        }
        try {
            countDownLatch.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + serverConsumer.getQueue());
        }
    }

    public String tempQueueName() {
        return UUIDGenerator.getInstance().generateStringUUID();
    }

    public void close() throws Exception {
        if (this.serverSession != null) {
            recoverContext();
            try {
                this.serverSession.close(false);
            } finally {
                resetContext();
            }
        }
    }

    public void ack(Transaction transaction, Object obj, Message message) throws Exception {
        if (transaction == null) {
            transaction = this.serverSession.getCurrentTransaction();
        }
        recoverContext();
        try {
            ((ServerConsumer) obj).individualAcknowledge(transaction, message.getMessageID());
            resetContext();
        } catch (Throwable th) {
            resetContext();
            throw th;
        }
    }

    public void cancel(Object obj, Message message, boolean z) throws Exception {
        recoverContext();
        try {
            ((ServerConsumer) obj).individualCancel(message.getMessageID(), z);
            ((ServerConsumer) obj).getQueue().forceDelivery();
            resetContext();
        } catch (Throwable th) {
            resetContext();
            throw th;
        }
    }

    public void resumeDelivery(Object obj) {
        ((ServerConsumer) obj).receiveCredits(-1);
    }

    public void serverSend(Transaction transaction, Receiver receiver, Delivery delivery, String str, int i, byte[] bArr) throws Exception {
        AMQPMessage aMQPMessage = new AMQPMessage(i, bArr);
        if (str != null) {
            aMQPMessage.m1setAddress(new SimpleString(str));
        } else if (aMQPMessage.getAddress() == null) {
            rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
            return;
        } else if (!bindingQuery(aMQPMessage.getAddress().toString())) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
        }
        recoverContext();
        if (!this.manager.getServer().getPagingManager().getPageStore(aMQPMessage.getAddressSimpleString()).isRejectingMessages()) {
            serverSend(transaction, aMQPMessage, delivery, receiver);
            return;
        }
        if (!delivery.remotelySettled()) {
            rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + str);
        } else if (transaction != null) {
            transaction.markAsRollbackOnly(new ActiveMQAMQPResourceLimitExceededException("Address is full: " + delivery.getLink().getTarget().getAddress()));
        }
    }

    private void rejectMessage(Delivery delivery, Symbol symbol, String str) {
        ErrorCondition errorCondition = new ErrorCondition();
        errorCondition.setCondition(symbol);
        errorCondition.setDescription(str);
        Rejected rejected = new Rejected();
        rejected.setError(errorCondition);
        delivery.disposition(rejected);
        delivery.settle();
        this.connection.flush();
    }

    private void serverSend(Transaction transaction, Message message, final Delivery delivery, final Receiver receiver) throws Exception {
        try {
            message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
            this.serverSession.send(transaction, message, false, false);
            this.manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { // from class: org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback.3
                public void done() {
                    synchronized (AMQPSessionCallback.this.connection.getLock()) {
                        delivery.disposition(Accepted.getInstance());
                        delivery.settle();
                    }
                    AMQPSessionCallback.this.connection.flush(true);
                }

                public void onError(int i, String str) {
                    synchronized (AMQPSessionCallback.this.connection.getLock()) {
                        receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, i + ":" + str));
                        AMQPSessionCallback.this.connection.flush();
                    }
                }
            });
            resetContext();
        } catch (Throwable th) {
            resetContext();
            throw th;
        }
    }

    public void offerProducerCredit(String str, final int i, final int i2, final Receiver receiver) {
        try {
            if (str != null) {
                this.manager.getServer().getPagingManager().getPageStore(new SimpleString(str)).checkMemory(new Runnable() { // from class: org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback.4
                    @Override // java.lang.Runnable
                    public void run() {
                        synchronized (AMQPSessionCallback.this.connection.getLock()) {
                            if (receiver.getRemoteCredit() < i2) {
                                receiver.flow(i);
                                AMQPSessionCallback.this.connection.flush();
                            }
                        }
                    }
                });
                return;
            }
            synchronized (this.connection.getLock()) {
                receiver.flow(i);
                this.connection.flush();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteQueue(String str) throws Exception {
        this.manager.getServer().destroyQueue(new SimpleString(str));
    }

    private void resetContext() {
        this.manager.getServer().getStorageManager().setContext((OperationContext) null);
    }

    private void recoverContext() {
        this.manager.getServer().getStorageManager().setContext(this.serverSession.getSessionContext());
    }

    public void sendProducerCreditsMessage(int i, SimpleString simpleString) {
    }

    public boolean updateDeliveryCountAfterCancel(ServerConsumer serverConsumer, MessageReference messageReference, boolean z) {
        return false;
    }

    public void sendProducerCreditsFailMessage(int i, SimpleString simpleString) {
    }

    public int sendMessage(MessageReference messageReference, Message message, ServerConsumer serverConsumer, int i) {
        ProtonServerSenderContext protonServerSenderContext = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
        try {
            return protonServerSenderContext.deliverMessage(messageReference, i);
        } catch (Exception e) {
            synchronized (this.connection.getLock()) {
                protonServerSenderContext.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
                this.connection.flush();
                throw new IllegalStateException("Can't deliver message " + e, e);
            }
        }
    }

    public int sendLargeMessage(MessageReference messageReference, Message message, ServerConsumer serverConsumer, long j, int i) {
        return 0;
    }

    public int sendLargeMessageContinuation(ServerConsumer serverConsumer, byte[] bArr, boolean z, boolean z2) {
        return 0;
    }

    public void closed() {
    }

    public void disconnect(ServerConsumer serverConsumer, String str) {
        ErrorCondition errorCondition = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + str);
        try {
            synchronized (this.connection.getLock()) {
                ((ProtonServerSenderContext) serverConsumer.getProtocolContext()).close(errorCondition);
                this.connection.flush();
            }
        } catch (ActiveMQAMQPException e) {
            logger.error("Error closing link for " + serverConsumer.getQueue().getAddress());
        }
    }

    public boolean hasCredits(ServerConsumer serverConsumer) {
        ProtonServerSenderContext protonServerSenderContext = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
        return protonServerSenderContext != null && protonServerSenderContext.getSender().getCredit() > 0;
    }

    public Transaction getTransaction(Binary binary) throws ActiveMQAMQPException {
        return this.protonSPI.getTransaction(binary);
    }

    public Binary newTransaction() {
        return this.protonSPI.newTransaction();
    }

    public void commitTX(Binary binary) throws Exception {
        this.protonSPI.getTransaction(binary).commit(true);
        this.protonSPI.removeTransaction(binary);
    }

    public void rollbackTX(Binary binary, boolean z) throws Exception {
        this.protonSPI.getTransaction(binary).rollback();
        this.protonSPI.removeTransaction(binary);
    }

    public void dischargeTx(Binary binary) throws ActiveMQAMQPException {
        this.protonSPI.getTransaction(binary).discharge();
    }

    public SimpleString getMatchingQueue(SimpleString simpleString, RoutingType routingType) throws Exception {
        return this.serverSession.getMatchingQueue(simpleString, routingType);
    }

    public SimpleString getMatchingQueue(SimpleString simpleString, SimpleString simpleString2, RoutingType routingType) throws Exception {
        return this.serverSession.getMatchingQueue(simpleString, simpleString2, routingType);
    }

    public AddressInfo getAddress(SimpleString simpleString) {
        return this.serverSession.getAddress(simpleString);
    }

    public void removeTemporaryQueue(String str) throws Exception {
        this.serverSession.deleteQueue(SimpleString.toSimpleString(str));
    }
}
