package org.apache.activemq.artemis.core.server.federation;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.transformer.Transformer;

/* loaded from: input_file:artemis-server-2.27.1.jar:org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.class */
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
    private final ActiveMQServer server;
    private final Federation federation;
    private final FederatedConsumerKey key;
    private final Transformer transformer;
    private final FederationUpstream upstream;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ClientSessionCallback clientSessionCallback;
    private ClientSessionFactoryInternal clientSessionFactory;
    private ClientSession clientSession;
    private ClientConsumer clientConsumer;
    private final AtomicInteger count = new AtomicInteger();
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;

    /* loaded from: input_file:artemis-server-2.27.1.jar:org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl$ClientSessionCallback.class */
    public interface ClientSessionCallback {
        void callback(ClientSession clientSession) throws ActiveMQException;
    }

    public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer activeMQServer, Transformer transformer, FederatedConsumerKey federatedConsumerKey, FederationUpstream federationUpstream, ClientSessionCallback clientSessionCallback) {
        this.federation = federation;
        this.server = activeMQServer;
        this.key = federatedConsumerKey;
        this.transformer = transformer;
        this.upstream = federationUpstream;
        this.scheduledExecutorService = activeMQServer.getScheduledPool();
        this.clientSessionCallback = clientSessionCallback;
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public FederationUpstream getFederationUpstream() {
        return this.upstream;
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public Federation getFederation() {
        return this.federation;
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public FederatedConsumerKey getKey() {
        return this.key;
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public ClientSession getClientSession() {
        return this.clientSession;
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public int incrementCount() {
        return this.count.incrementAndGet();
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public int decrementCount() {
        return this.count.decrementAndGet();
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public void start() {
        scheduleConnect(0);
    }

    private void scheduleConnect(int i) {
        this.scheduledExecutorService.schedule(() -> {
            try {
                connect();
            } catch (Exception e) {
                scheduleConnect(FederatedQueueConsumer.getNextDelay(i, 2, 30));
            }
        }, i, TimeUnit.SECONDS);
    }

    private void connect() throws Exception {
        try {
            if (this.clientConsumer == null) {
                synchronized (this) {
                    this.clientSessionFactory = (ClientSessionFactoryInternal) this.upstream.getConnection().clientSessionFactory();
                    this.clientSession = this.clientSessionFactory.createSession(this.upstream.getUser(), this.upstream.getPassword(), false, true, true, this.clientSessionFactory.getServerLocator().isPreAcknowledge(), this.clientSessionFactory.getServerLocator().getAckBatchSize());
                    this.clientSession.addFailureListener(this);
                    this.clientSession.addMetaData(FederatedQueueConsumer.FEDERATION_NAME, this.federation.getName().toString());
                    this.clientSession.addMetaData(FederatedQueueConsumer.FEDERATION_UPSTREAM_NAME, this.upstream.getName().toString());
                    this.clientSession.start();
                    if (this.clientSessionCallback != null) {
                        this.clientSessionCallback.callback(this.clientSession);
                    }
                    if (!this.clientSession.queueQuery(this.key.getQueueName()).isExists()) {
                        throw new ActiveMQNonExistentQueueException("Queue " + this.key.getQueueName() + " does not exist on remote");
                    }
                    this.clientConsumer = this.clientSession.createConsumer(this.key.getQueueName(), this.key.getFilterString(), this.key.getPriority(), false);
                    this.clientConsumer.setMessageHandler(this);
                }
            }
        } catch (Exception e) {
            try {
                if (this.clientSessionFactory != null) {
                    this.clientSessionFactory.cleanup();
                }
                disconnect();
            } catch (ActiveMQException e2) {
            }
            throw e;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer
    public void close() {
        scheduleDisconnect(0);
    }

    private void scheduleDisconnect(int i) {
        this.scheduledExecutorService.schedule(() -> {
            try {
                disconnect();
            } catch (Exception e) {
            }
        }, i, TimeUnit.SECONDS);
    }

    private void disconnect() throws ActiveMQException {
        if (this.clientConsumer != null) {
            this.clientConsumer.close();
        }
        if (this.clientSession != null) {
            this.clientSession.close();
        }
        this.clientConsumer = null;
        this.clientSession = null;
        if (this.clientSessionFactory == null || this.clientSessionFactory.numSessions() != 0 || this.upstream.getConnection().isSharedConnection()) {
            return;
        }
        this.clientSessionFactory.close();
        this.clientSessionFactory = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v48, types: [org.apache.activemq.artemis.api.core.Message] */
    @Override // org.apache.activemq.artemis.api.core.client.MessageHandler
    public void onMessage(ClientMessage clientMessage) {
        LargeMessageControllerImpl.LargeData take;
        try {
            ClientMessage clientMessage2 = clientMessage;
            if (clientMessage2 instanceof ClientLargeMessageInternal) {
                StorageManager storageManager = this.server.getStorageManager();
                LargeServerMessage createLargeMessage = storageManager.createLargeMessage(storageManager.generateID(), clientMessage2);
                do {
                    take = ((ClientLargeMessageInternal) clientMessage).getLargeMessageController().take();
                    createLargeMessage.addBytes(take.getChunk());
                } while (take.isContinues());
                clientMessage2 = createLargeMessage.toMessage();
                createLargeMessage.releaseResources(true, true);
            }
            if (this.server.hasBrokerFederationPlugins()) {
                try {
                    this.server.callBrokerFederationPlugins(activeMQServerFederationPlugin -> {
                        activeMQServerFederationPlugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage);
                    });
                } catch (ActiveMQException e) {
                    ActiveMQServerLogger.LOGGER.federationPluginExecutionError("beforeFederatedQueueConsumerMessageHandled", e);
                    throw new IllegalStateException(e.getMessage(), e.getCause());
                }
            }
            Message transform = this.transformer == null ? clientMessage2 : this.transformer.transform(clientMessage2);
            if (transform != null) {
                this.server.getPostOffice().route(transform, true);
            }
            clientMessage.acknowledge();
            if (this.server.hasBrokerFederationPlugins()) {
                try {
                    this.server.callBrokerFederationPlugins(activeMQServerFederationPlugin2 -> {
                        activeMQServerFederationPlugin2.afterFederatedQueueConsumerMessageHandled(this, clientMessage);
                    });
                } catch (ActiveMQException e2) {
                    ActiveMQServerLogger.LOGGER.federationPluginExecutionError("afterFederatedQueueConsumerMessageHandled", e2);
                    throw new IllegalStateException(e2.getMessage(), e2.getCause());
                }
            }
        } catch (Exception e3) {
            ActiveMQServerLogger.LOGGER.federationDispatchError(clientMessage.toString(), e3);
            try {
                this.clientSession.rollback();
            } catch (ActiveMQException e4) {
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.remoting.FailureListener
    public void connectionFailed(ActiveMQException activeMQException, boolean z) {
        connectionFailed(activeMQException, z, null);
    }

    @Override // org.apache.activemq.artemis.core.remoting.FailureListener
    public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
        try {
            this.clientSessionFactory.cleanup();
            this.clientSessionFactory.close();
            this.clientConsumer = null;
            this.clientSession = null;
            this.clientSessionFactory = null;
        } catch (Throwable th) {
        }
        start();
    }

    @Override // org.apache.activemq.artemis.api.core.client.SessionFailureListener
    public void beforeReconnect(ActiveMQException activeMQException) {
    }
}
