package org.apache.activemq.artemis.core.server.federation;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationStreamConnectMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.jboss.logging.Logger;

/* loaded from: input_file:WEB-INF/lib/artemis-server-2.20.0.jar:org/apache/activemq/artemis/core/server/federation/FederationDownstream.class */
public class FederationDownstream extends AbstractFederationStream implements SessionFailureListener {
    private static final Logger logger = Logger.getLogger((Class<?>) FederationDownstream.class);
    private FederationDownstreamConfiguration config;
    private ClientSessionFactoryInternal clientSessionFactory;
    private ClientSessionInternal clientSession;
    private Channel channel;
    private AtomicBoolean initialized;
    private final ScheduledExecutorService scheduledExecutorService;
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    public static final String FEDERATION_DOWNSTREAM_NAME = "federation-downstream-name";
    private AtomicBoolean started;
    private FederationConfiguration federationConfiguration;

    public FederationDownstream(ActiveMQServer activeMQServer, Federation federation, String str, FederationDownstreamConfiguration federationDownstreamConfiguration, FederationConnection federationConnection) {
        super(activeMQServer, federation, str, federationDownstreamConfiguration, federationConnection);
        this.initialized = new AtomicBoolean();
        this.intialConnectDelayMultiplier = 2;
        this.intialConnectDelayMax = 30;
        this.started = new AtomicBoolean();
        this.config = federationDownstreamConfiguration;
        this.scheduledExecutorService = activeMQServer.getScheduledPool();
    }

    @Override // org.apache.activemq.artemis.core.server.federation.AbstractFederationStream, org.apache.activemq.artemis.core.server.federation.FederationStream
    public synchronized void start() {
        super.start();
        callFederationStreamStartedPlugins();
        try {
            deploy(this.federationConfiguration);
        } catch (ActiveMQException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override // org.apache.activemq.artemis.core.server.federation.AbstractFederationStream, org.apache.activemq.artemis.core.server.federation.FederationStream
    public synchronized void stop() {
        super.stop();
        callFederationStreamStoppedPlugins();
    }

    public void deploy(FederationConfiguration federationConfiguration) throws ActiveMQException {
        this.federationConfiguration = federationConfiguration;
        if (this.connection.isStarted() && this.started.compareAndSet(false, true)) {
            FederationDownstreamConnectMessage federationDownstreamConnectMessage = new FederationDownstreamConnectMessage();
            federationDownstreamConnectMessage.setName(federationConfiguration.getName());
            federationDownstreamConnectMessage.setCredentials(federationConfiguration.getCredentials());
            federationDownstreamConnectMessage.setStreamConfiguration(this.config);
            federationDownstreamConnectMessage.setFederationPolicyMap(federationConfiguration.getFederationPolicyMap());
            federationDownstreamConnectMessage.setTransformerConfigurationMap(federationConfiguration.getTransformerConfigurationMap());
            if (this.config.getUpstreamConfigurationRef() != null && this.config.getUpstreamConfiguration() == null) {
                TransportConfiguration[] transportConfigurations = this.server.getConfiguration().getTransportConfigurations(this.config.getUpstreamConfigurationRef());
                if (transportConfigurations == null || transportConfigurations.length <= 0) {
                    ActiveMQServerLogger.LOGGER.federationCantFindUpstreamConnector(this.config.getName(), this.config.getUpstreamConfigurationRef());
                    throw new ActiveMQException("Could not locate upstream transport configuration for federation downstream: " + this.config.getName() + "; upstream ref: " + this.config.getUpstreamConfigurationRef());
                }
                this.config.setUpstreamConfiguration(transportConfigurations[0]);
            }
            try {
                scheduleConnect(0, federationDownstreamConnectMessage);
                ActiveMQServerLogger.LOGGER.federationDownstreamDeployed(this.config.getName());
            } catch (Exception e) {
                throw new ActiveMQException(e.getMessage(), e, ActiveMQExceptionType.GENERIC_EXCEPTION);
            }
        }
    }

    public void undeploy() {
        try {
            if (this.started.compareAndSet(true, false)) {
                disconnect();
                ActiveMQServerLogger.LOGGER.federationDownstreamUnDeployed(this.config.getName());
            }
        } catch (ActiveMQException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    private void scheduleConnect(int i, FederationStreamConnectMessage federationStreamConnectMessage) {
        this.scheduledExecutorService.schedule(() -> {
            try {
                connect();
                if (this.initialized.compareAndSet(false, true)) {
                    this.channel.send(federationStreamConnectMessage);
                }
            } catch (Exception e) {
                scheduleConnect(FederatedQueueConsumer.getNextDelay(i, 2, 30), federationStreamConnectMessage);
            }
        }, i, TimeUnit.SECONDS);
    }

    private void connect() throws Exception {
        try {
            if (this.clientSession == null) {
                synchronized (this) {
                    this.clientSessionFactory = (ClientSessionFactoryInternal) getConnection().clientSessionFactory();
                    this.clientSession = (ClientSessionInternal) this.clientSessionFactory.createSession(getUser(), getPassword(), false, true, true, this.clientSessionFactory.getServerLocator().isPreAcknowledge(), this.clientSessionFactory.getServerLocator().getAckBatchSize());
                    this.clientSession.addFailureListener(this);
                    this.clientSession.addMetaData(FederatedQueueConsumer.FEDERATION_NAME, this.federation.getName().toString());
                    this.clientSession.addMetaData(FEDERATION_DOWNSTREAM_NAME, this.config.getName().toString());
                    this.clientSession.start();
                    this.channel = ((CoreRemotingConnection) this.clientSessionFactory.getConnection()).getChannel(ChannelImpl.CHANNEL_ID.FEDERATION.id, -1);
                }
            }
        } catch (Exception e) {
            try {
                if (this.clientSessionFactory != null) {
                    this.clientSessionFactory.cleanup();
                }
                disconnect();
            } catch (ActiveMQException e2) {
            }
            throw e;
        }
    }

    @Override // org.apache.activemq.artemis.core.remoting.FailureListener
    public void connectionFailed(ActiveMQException activeMQException, boolean z) {
        connectionFailed(activeMQException, z, null);
    }

    @Override // org.apache.activemq.artemis.core.remoting.FailureListener
    public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
        try {
            this.started.set(false);
            this.initialized.set(false);
            this.channel.close();
            this.clientSessionFactory.cleanup();
            this.clientSessionFactory.close();
            this.channel = null;
            this.clientSession = null;
            this.clientSessionFactory = null;
        } catch (Throwable th) {
        }
        start();
    }

    private void disconnect() throws ActiveMQException {
        this.initialized.set(false);
        if (this.channel != null) {
            this.channel.close();
        }
        if (this.clientSession != null) {
            this.clientSession.close();
        }
        this.channel = null;
        this.clientSession = null;
        if (this.clientSessionFactory != null) {
            if (!getConnection().isSharedConnection() || this.clientSessionFactory.numSessions() == 0) {
                this.clientSessionFactory.close();
                this.clientSessionFactory = null;
            }
        }
    }

    @Override // org.apache.activemq.artemis.api.core.client.SessionFailureListener
    public void beforeReconnect(ActiveMQException activeMQException) {
    }
}
