package org.apache.activemq.artemis.core.server.cluster.impl;

import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-server-2.27.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.class */
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final ServerLocatorInternal serverLocator;
    protected final Executor executor;
    protected final ScheduledExecutorService scheduledExecutor;
    private final UUID nodeUUID;
    private final long sequentialID;
    protected final Queue queue;
    private final Filter filter;
    private final Transformer transformer;
    private boolean blockedOnFlowControl;
    protected ScheduledFuture<?> futureScheduledReconnection;
    protected volatile ClientSessionInternal session;
    protected volatile ClientSessionInternal sessionConsumer;
    protected String targetNodeID;
    protected TopologyMember targetNode;
    private volatile ClientSessionFactoryInternal csf;
    private volatile ClientProducer producer;
    private volatile boolean started;
    private volatile boolean active;
    private boolean deliveringLargeMessage;
    private int reconnectAttemptsInUse;
    private NotificationService notificationService;
    private ActiveMQServer server;
    private BridgeConfiguration configuration;
    private final ReusableLatch pendingAcks = new ReusableLatch(0);
    final Map<Long, MessageReference> refs = new LinkedHashMap();
    private final Object connectionGuard = new Object();
    protected volatile boolean disconnectedAndDown = false;
    private volatile boolean stopping = false;
    private int retryCount = 0;
    private boolean keepConnecting = true;
    private final BridgeMetrics metrics = new BridgeMetrics();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.27.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$ConnectRunnable.class */
    public static final class ConnectRunnable implements Runnable {
        private final BridgeImpl bridge;

        private ConnectRunnable(BridgeImpl bridgeImpl) {
            this.bridge = bridgeImpl;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.bridge.connect();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.27.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$FutureConnectRunnable.class */
    public static class FutureConnectRunnable implements Runnable {
        private final BridgeImpl bridge;
        private final Executor executor;

        private FutureConnectRunnable(Executor executor, BridgeImpl bridgeImpl) {
            this.executor = executor;
            this.bridge = bridgeImpl;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.bridge.isStarted()) {
                this.executor.execute(new ConnectRunnable(this.bridge));
            }
        }
    }

    /* loaded from: input_file:artemis-server-2.27.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$PauseRunnable.class */
    private class PauseRunnable implements Runnable {
        private PauseRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                if (!BridgeImpl.this.pendingAcks.await(60L, TimeUnit.SECONDS)) {
                    ActiveMQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(), BridgeImpl.this.pendingAcks.getCount());
                }
                synchronized (BridgeImpl.this) {
                    BridgeImpl.this.started = false;
                    BridgeImpl.this.active = false;
                }
                BridgeImpl.this.internalCancelReferences();
                ActiveMQServerLogger.LOGGER.bridgePaused(BridgeImpl.this.configuration.getName());
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorPausingBridge(e);
            }
        }
    }

    /* loaded from: input_file:artemis-server-2.27.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$StopRunnable.class */
    private class StopRunnable implements Runnable {
        private StopRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            BridgeImpl.logger.debug("stopping bridge {}", BridgeImpl.this);
            BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
            synchronized (BridgeImpl.this) {
                BridgeImpl.logger.debug("Closing Session for bridge {}", BridgeImpl.this.configuration.getName());
                BridgeImpl.this.started = false;
                BridgeImpl.this.active = false;
            }
            if (BridgeImpl.this.session != null) {
                BridgeImpl.logger.debug("Cleaning up session {}", BridgeImpl.this.session);
                BridgeImpl.this.session.removeFailureListener(BridgeImpl.this);
                try {
                    BridgeImpl.this.session.close();
                    BridgeImpl.this.session = null;
                } catch (ActiveMQException e) {
                }
            }
            if (BridgeImpl.this.sessionConsumer != null) {
                BridgeImpl.logger.debug("Cleaning up session {}", BridgeImpl.this.session);
                try {
                    BridgeImpl.this.sessionConsumer.close();
                    BridgeImpl.this.sessionConsumer = null;
                } catch (ActiveMQException e2) {
                }
            }
            BridgeImpl.this.internalCancelReferences();
            if (BridgeImpl.this.csf != null) {
                BridgeImpl.this.csf.cleanup();
            }
            synchronized (BridgeImpl.this.connectionGuard) {
                BridgeImpl.this.keepConnecting = true;
            }
            BridgeImpl.logger.trace("Removing consumer on stopRunnable {} from queue {}", this, BridgeImpl.this.queue);
            ActiveMQServerLogger.LOGGER.bridgeStopped(BridgeImpl.this.configuration.getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.27.0.jar:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$TopologyListener.class */
    public class TopologyListener implements ClusterTopologyListener {
        private TopologyListener() {
        }

        @Override // org.apache.activemq.artemis.api.core.client.ClusterTopologyListener
        public void nodeUP(TopologyMember topologyMember, boolean z) {
            BridgeImpl.this.nodeUP(topologyMember, z);
        }

        @Override // org.apache.activemq.artemis.api.core.client.ClusterTopologyListener
        public void nodeDown(long j, String str) {
        }
    }

    public BridgeImpl(ServerLocatorInternal serverLocatorInternal, BridgeConfiguration bridgeConfiguration, UUID uuid, Queue queue, Executor executor, ScheduledExecutorService scheduledExecutorService, ActiveMQServer activeMQServer) throws ActiveMQException {
        this.sequentialID = activeMQServer.getStorageManager().generateID();
        this.configuration = bridgeConfiguration;
        this.reconnectAttemptsInUse = bridgeConfiguration.getInitialConnectAttempts();
        this.serverLocator = serverLocatorInternal;
        this.nodeUUID = uuid;
        this.queue = queue;
        this.executor = executor;
        this.scheduledExecutor = scheduledExecutorService;
        this.transformer = activeMQServer.getServiceRegistry().getBridgeTransformer(bridgeConfiguration.getName(), bridgeConfiguration.getTransformerConfiguration());
        this.filter = FilterImpl.createFilter(bridgeConfiguration.getFilterString());
        this.server = activeMQServer;
    }

    public boolean isBlockedOnFlowControl() {
        return this.blockedOnFlowControl;
    }

    public static final byte[] getDuplicateBytes(UUID uuid, long j) {
        byte[] bArr = new byte[24];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.put(uuid.asBytes());
        wrap.putLong(j);
        return bArr;
    }

    public ClientSessionFactory getSessionFactory() {
        return this.csf;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public List<MessageReference> getDeliveringMessages() {
        ArrayList arrayList;
        synchronized (this.refs) {
            arrayList = new ArrayList(this.refs.values());
        }
        return arrayList;
    }

    private static void cleanUpSessionFactory(ClientSessionFactoryInternal clientSessionFactoryInternal) {
        if (clientSessionFactoryInternal != null) {
            clientSessionFactoryInternal.cleanup();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void setNotificationService(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback
    public void onCreditsFlow(boolean z, ClientProducerCredits clientProducerCredits) {
        if (logger.isTraceEnabled()) {
            logger.trace("Bridge {} received credits, with blocked = {}", getName(), Boolean.valueOf(z));
        }
        this.blockedOnFlowControl = z;
        if (z) {
            return;
        }
        this.queue.deliverAsync();
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback
    public void onCreditsFail(ClientProducerCredits clientProducerCredits) {
        ActiveMQServerLogger.LOGGER.bridgeAddressFull(String.valueOf(clientProducerCredits.getAddress()), String.valueOf(getName()));
        disconnect();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public long sequentialID() {
        return this.sequentialID;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        this.stopping = false;
        activate();
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString(QueueConfiguration.NAME), SimpleString.toSimpleString(this.configuration.getName()));
            this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), CoreNotificationType.BRIDGE_STARTED, typedProperties));
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String debug() {
        return toString();
    }

    private void cancelRefs() {
        LinkedList linkedList = new LinkedList();
        synchronized (this.refs) {
            linkedList.addAll(this.refs.values());
            this.refs.clear();
        }
        if (logger.isTraceEnabled()) {
            logger.trace("BridgeImpl::cancelRefs cancelling {} references", Integer.valueOf(linkedList.size()));
        }
        if (logger.isTraceEnabled() && linkedList.isEmpty()) {
            logger.trace("didn't have any references to cancel on bridge {}", this);
            return;
        }
        ListIterator listIterator = linkedList.listIterator(linkedList.size());
        long currentTimeMillis = System.currentTimeMillis();
        while (listIterator.hasPrevious()) {
            MessageReference messageReference = (MessageReference) listIterator.previous();
            logger.trace("BridgeImpl::cancelRefs Cancelling reference {} on bridge {}", messageReference, this);
            try {
                messageReference.getQueue().cancel(messageReference, currentTimeMillis);
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(messageReference, e);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void flushExecutor() {
        FutureLatch futureLatch = new FutureLatch();
        this.executor.execute(futureLatch);
        if (futureLatch.await(10000L)) {
            return;
        }
        ActiveMQServerLogger.LOGGER.timedOutWaitingToStopBridge();
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge, org.apache.activemq.artemis.core.server.Consumer
    public void disconnect() {
        this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl.1
            @Override // java.lang.Runnable
            public void run() {
                if (BridgeImpl.this.session != null) {
                    try {
                        BridgeImpl.this.session.cleanUp(false);
                    } catch (Exception e) {
                        BridgeImpl.logger.debug(e.getMessage(), e);
                    }
                    BridgeImpl.this.session = null;
                }
                if (BridgeImpl.this.sessionConsumer != null) {
                    try {
                        BridgeImpl.this.sessionConsumer.cleanUp(false);
                    } catch (Exception e2) {
                        BridgeImpl.logger.debug(e2.getMessage(), e2);
                    }
                    BridgeImpl.this.sessionConsumer = null;
                }
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public boolean isConnected() {
        return this.session != null;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public void stop() throws Exception {
        if (this.stopping) {
            return;
        }
        this.stopping = true;
        logger.debug("Bridge {} being stopped", this.configuration.getName());
        if (this.futureScheduledReconnection != null) {
            this.futureScheduledReconnection.cancel(true);
        }
        this.executor.execute(new StopRunnable());
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString(QueueConfiguration.NAME), SimpleString.toSimpleString(this.configuration.getName()));
            try {
                this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, typedProperties));
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.broadcastBridgeStoppedError(e);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void pause() throws Exception {
        logger.debug("Bridge {} being paused", this.configuration.getName());
        this.executor.execute(new PauseRunnable());
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString(QueueConfiguration.NAME), SimpleString.toSimpleString(this.configuration.getName()));
            try {
                this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, typedProperties));
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.notificationBridgeStoppedError(e);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void resume() throws Exception {
        this.queue.addConsumer(this);
        this.queue.deliverAsync();
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public boolean isStarted() {
        return this.started;
    }

    public synchronized void activate() {
        this.executor.execute(new ConnectRunnable(this));
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public SimpleString getName() {
        return SimpleString.toSimpleString(this.configuration.getName());
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public Queue getQueue() {
        return this.queue;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public SimpleString getForwardingAddress() {
        return SimpleString.toSimpleString(this.configuration.getForwardingAddress());
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public RemotingConnection getForwardingConnection() {
        if (this.session == null) {
            return null;
        }
        return this.session.getConnection();
    }

    @Override // org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler
    public void sendAcknowledged(Message message) {
        MessageReference remove;
        logger.trace("BridgeImpl::sendAcknowledged received confirmation for message {}", message);
        if (this.active) {
            try {
                synchronized (this.refs) {
                    remove = this.refs.remove(Long.valueOf(message.getMessageID()));
                }
                if (remove != null) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("BridgeImpl::sendAcknowledged bridge {} Acking {} on queue {}", new Object[]{this, remove, remove.getQueue()});
                    }
                    remove.getQueue().acknowledge(remove);
                    this.pendingAcks.countDown();
                    this.metrics.incrementMessagesAcknowledged();
                    if (this.server.hasBrokerBridgePlugins()) {
                        this.server.callBrokerBridgePlugins(activeMQServerBridgePlugin -> {
                            activeMQServerBridgePlugin.afterAcknowledgeBridge(this, remove);
                        });
                    }
                } else {
                    logger.trace("BridgeImpl::sendAcknowledged bridge {} could not find reference for message {}", this, message);
                }
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected Message beforeForward(Message message, SimpleString simpleString) {
        Message copy = message.copy();
        ((RefCountMessage) copy).setParentRef((RefCountMessage) copy);
        return beforeForwardingNoCopy(copy, simpleString);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message beforeForwardingNoCopy(Message message, SimpleString simpleString) {
        if (this.configuration.isUseDuplicateDetection()) {
            message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, getDuplicateBytes(this.nodeUUID, message.getMessageID()));
        }
        if (simpleString != null) {
            message.setAddress(simpleString);
        }
        switch (this.configuration.getRoutingType()) {
            case ANYCAST:
                message.setRoutingType(RoutingType.ANYCAST);
                break;
            case MULTICAST:
                message.setRoutingType(RoutingType.MULTICAST);
                break;
            case STRIP:
                message.setRoutingType(null);
                break;
        }
        message.messageChanged();
        if (this.transformer == null) {
            return EmbedMessageUtil.embedAsCoreMessage(message);
        }
        Message transform = this.transformer.transform(message);
        if (transform != message) {
            logger.debug("The transformer {} made a copy of the message {} as transformedMessage", this.transformer, message);
        }
        return EmbedMessageUtil.embedAsCoreMessage(transform);
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.ReadyListener
    public void readyForWriting() {
        this.queue.deliverAsync();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        HandleStatus deliverStandardMessage;
        if (this.filter != null && !this.filter.match(messageReference.getMessage())) {
            logger.trace("message reference {} is no match for bridge {}", messageReference, this.configuration.getName());
            return HandleStatus.NO_MATCH;
        }
        synchronized (this) {
            if (!this.active || !this.session.isWritable(this)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}::Ignoring reference on bridge as it is set to inactive ref {}, active = {}", new Object[]{this, messageReference, Boolean.valueOf(this.active)});
                }
                return HandleStatus.BUSY;
            }
            if (this.blockedOnFlowControl) {
                logger.debug("Bridge {} is blocked on flow control, cannot receive {}", this.configuration.getName(), messageReference);
                return HandleStatus.BUSY;
            }
            if (this.deliveringLargeMessage) {
                logger.trace("Bridge {} is busy delivering a large message", this.configuration.getName());
                return HandleStatus.BUSY;
            }
            logger.trace("Bridge {} is handling reference {} ", messageReference);
            messageReference.handled();
            synchronized (this.refs) {
                this.refs.put(Long.valueOf(messageReference.getMessage().getMessageID()), messageReference);
            }
            SimpleString simpleString = this.configuration.getForwardingAddress() != null ? SimpleString.toSimpleString(this.configuration.getForwardingAddress()) : messageReference.getMessage().getAddressSimpleString();
            Message beforeForward = beforeForward(messageReference.getMessage(), simpleString);
            this.pendingAcks.countUp();
            try {
                if (this.server.hasBrokerBridgePlugins()) {
                    this.server.callBrokerBridgePlugins(activeMQServerBridgePlugin -> {
                        activeMQServerBridgePlugin.beforeDeliverBridge(this, messageReference);
                    });
                }
                if (beforeForward.isLargeMessage()) {
                    this.deliveringLargeMessage = true;
                    deliverLargeMessage(simpleString, messageReference, (LargeServerMessage) beforeForward, messageReference.getMessage());
                    deliverStandardMessage = HandleStatus.HANDLED;
                } else {
                    deliverStandardMessage = deliverStandardMessage(simpleString, messageReference, beforeForward, messageReference.getMessage());
                }
                if (deliverStandardMessage == HandleStatus.HANDLED) {
                    this.metrics.incrementMessagesPendingAcknowledgement();
                }
                if (this.server.hasBrokerBridgePlugins()) {
                    HandleStatus handleStatus = deliverStandardMessage;
                    this.server.callBrokerBridgePlugins(activeMQServerBridgePlugin2 -> {
                        activeMQServerBridgePlugin2.afterDeliverBridge(this, messageReference, handleStatus);
                    });
                }
                return deliverStandardMessage;
            } catch (Exception e) {
                this.pendingAcks.countDown();
                throw e;
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void proceedDeliver(MessageReference messageReference) {
    }

    @Override // org.apache.activemq.artemis.core.remoting.FailureListener
    public void connectionFailed(ActiveMQException activeMQException, boolean z) {
        connectionFailed(activeMQException, z, null);
    }

    @Override // org.apache.activemq.artemis.core.remoting.FailureListener
    public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
        if (this.server.isStarted()) {
            ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(Boolean.valueOf(z));
        }
        synchronized (this.connectionGuard) {
            this.keepConnecting = true;
        }
        try {
            if (this.producer != null) {
                this.producer.close();
            }
            cleanUpSessionFactory(this.csf);
        } catch (Throwable th) {
        }
        try {
            this.session.cleanUp(false);
        } catch (Throwable th2) {
        }
        if (str != null && !str.equals(this.nodeUUID.toString())) {
            scaleDown(str);
        } else if (str != null) {
            logger.debug("Received scaleDownTargetNodeID: {}; cancelling reconnect.", str);
            fail(true, true);
        } else {
            logger.debug("Received invalid scaleDownTargetNodeID: {}", str);
            fail(activeMQException.getType() == ActiveMQExceptionType.DISCONNECTED, false);
        }
        tryScheduleRetryReconnect(activeMQException.getType());
    }

    protected void scaleDown(String str) {
        synchronized (this) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Moving {} messages from {} to {}", new Object[]{Long.valueOf(this.queue.getMessageCount()), this.queue.getName(), str});
                }
                ((QueueImpl) this.queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(str));
                fail(true, true);
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }

    protected void tryScheduleRetryReconnect(ActiveMQExceptionType activeMQExceptionType) {
        scheduleRetryConnect();
    }

    @Override // org.apache.activemq.artemis.api.core.client.SessionFailureListener
    public void beforeReconnect(ActiveMQException activeMQException) {
    }

    private void deliverLargeMessage(final SimpleString simpleString, final MessageReference messageReference, final LargeServerMessage largeServerMessage, Message message) {
        this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    BridgeImpl.this.producer.send(simpleString, largeServerMessage.toMessage());
                    BridgeImpl.this.unsetLargeMessageDelivery();
                    if (BridgeImpl.this.queue != null) {
                        BridgeImpl.this.queue.deliverAsync();
                    }
                } catch (ActiveMQException e) {
                    BridgeImpl.this.unsetLargeMessageDelivery();
                    ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(messageReference, e);
                    BridgeImpl.this.connectionFailed(e, false);
                }
            }
        });
    }

    private HandleStatus deliverStandardMessage(SimpleString simpleString, MessageReference messageReference, Message message, Message message2) {
        logger.trace("going to send message: {} from {}", message, this.queue);
        try {
            try {
                this.producer.send(simpleString, message);
                message2.usageDown();
                return HandleStatus.HANDLED;
            } catch (ActiveMQException e) {
                ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(messageReference, e);
                synchronized (this.refs) {
                    this.refs.remove(Long.valueOf(message.getMessageID()));
                    ((QueueImpl) messageReference.getQueue()).decDelivering(messageReference);
                    connectionFailed(e, false);
                    HandleStatus handleStatus = HandleStatus.BUSY;
                    message2.usageDown();
                    return handleStatus;
                }
            }
        } catch (Throwable th) {
            message2.usageDown();
            throw th;
        }
    }

    public TopologyMember getTargetNodeFromTopology() {
        return this.targetNode;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public BridgeMetrics getMetrics() {
        return this.metrics;
    }

    public String toString() {
        return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)) + " [name=" + this.configuration.getName() + ", queue=" + this.queue + " targetConnector=" + this.serverLocator + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String toManagementString() {
        return getClass().getSimpleName() + " [name=" + this.configuration.getName() + ", queue=" + this.queue.getName() + "/" + this.queue.getID() + "]";
    }

    public ClientSessionFactoryImpl getCSF() {
        return (ClientSessionFactoryImpl) this.csf;
    }

    public Transformer getTransformer() {
        return this.transformer;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public BridgeConfiguration getConfiguration() {
        return this.configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fail(boolean z, boolean z2) {
        logger.debug("{}\n\t::fail being called, permanently={}", this, Boolean.valueOf(z));
        if (this.targetNodeID != null) {
            this.disconnectedAndDown = true;
            this.serverLocator.notifyNodeDown(System.currentTimeMillis(), this.targetNodeID);
        }
        if (this.queue != null) {
            try {
                logger.trace("Removing consumer on fail {} from queue {}", this, this.queue);
                this.queue.removeConsumer(this);
            } catch (Exception e) {
                logger.debug(e.getMessage(), e);
            }
        }
        cancelRefs();
        if (this.queue != null) {
            this.queue.deliverAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterConnect() throws Exception {
        if (this.disconnectedAndDown && this.targetNodeID != null && this.targetNode != null) {
            this.serverLocator.notifyNodeUp(System.currentTimeMillis(), this.targetNodeID, this.targetNode.getBackupGroupName(), this.targetNode.getScaleDownGroupName(), new Pair<>(this.targetNode.getLive(), this.targetNode.getBackup()), false);
            this.disconnectedAndDown = false;
        }
        this.retryCount = 0;
        this.reconnectAttemptsInUse = this.configuration.getReconnectAttempts();
        if (this.futureScheduledReconnection != null) {
            this.futureScheduledReconnection.cancel(true);
            this.futureScheduledReconnection = null;
        }
    }

    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
        if (this.targetNodeID == null || (this.configuration.getReconnectAttemptsOnSameNode() >= 0 && this.retryCount > this.configuration.getReconnectAttemptsOnSameNode())) {
            this.serverLocator.resetToInitialConnectors();
            this.csf = (ClientSessionFactoryInternal) this.serverLocator.createSessionFactory();
        } else {
            this.csf = reconnectOnOriginalNode();
        }
        if (this.csf != null) {
            this.csf.setReconnectAttempts(0);
        }
        return this.csf;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientSessionFactoryInternal reconnectOnOriginalNode() throws Exception {
        String str = this.targetNodeID;
        TopologyMember topologyMember = this.targetNode;
        if (str == null || topologyMember == null) {
            return null;
        }
        TransportConfiguration[] transportConfigurationArr = new TransportConfiguration[2];
        int i = 0;
        if (topologyMember.getLive() != null) {
            i = 0 + 1;
            transportConfigurationArr[0] = topologyMember.getLive();
        }
        if (topologyMember.getBackup() != null) {
            int i2 = i;
            i++;
            transportConfigurationArr[i2] = topologyMember.getBackup();
        }
        if (i <= 0) {
            return null;
        }
        return (ClientSessionFactoryInternal) this.serverLocator.createSessionFactory(transportConfigurationArr[(this.retryCount - 1) % i]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSessionFactory(ClientSessionFactoryInternal clientSessionFactoryInternal) {
        this.csf = clientSessionFactoryInternal;
    }

    protected void connect() {
        if (this.stopping) {
            return;
        }
        synchronized (this.connectionGuard) {
            if (this.keepConnecting) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Connecting  {} to its destination [{}], csf={}", new Object[]{this, this.nodeUUID, this.csf});
                }
                this.retryCount++;
                try {
                    try {
                        if (this.csf == null || this.csf.isClosed()) {
                            if (this.stopping) {
                                return;
                            }
                            this.csf = createSessionFactory();
                            if (this.csf == null) {
                                scheduleRetryConnect();
                                return;
                            } else {
                                this.session = (ClientSessionInternal) this.csf.createSession(this.configuration.getUser(), this.configuration.getPassword(), false, true, true, true, 1);
                                this.session.getProducerCreditManager().setCallback(this);
                                this.sessionConsumer = (ClientSessionInternal) this.csf.createSession(this.configuration.getUser(), this.configuration.getPassword(), false, true, true, true, 1);
                            }
                        }
                        if (this.configuration.getForwardingAddress() != null) {
                            try {
                                if (!this.session.addressQuery(SimpleString.toSimpleString(this.configuration.getForwardingAddress())).isExists()) {
                                    ActiveMQServerLogger.LOGGER.errorQueryingBridge(this.configuration.getForwardingAddress(), Integer.valueOf(this.retryCount));
                                    scheduleRetryConnect();
                                    return;
                                }
                            } catch (Throwable th) {
                                ActiveMQServerLogger.LOGGER.errorQueryingBridge(this.configuration.getName(), th);
                                this.retryCount--;
                                scheduleRetryConnectFixedTimeout(100L);
                                return;
                            }
                        }
                        this.blockedOnFlowControl = false;
                        this.producer = this.session.createProducer();
                        this.session.addFailureListener(this);
                        this.session.setSendAcknowledgementHandler(this);
                        afterConnect();
                        this.active = true;
                        this.queue.addConsumer(this);
                        this.queue.deliverAsync();
                        ActiveMQServerLogger.LOGGER.bridgeConnected(this);
                        this.serverLocator.addClusterTopologyListener(new TopologyListener());
                        this.keepConnecting = false;
                    } catch (InterruptedException | ActiveMQInterruptedException e) {
                        ActiveMQServerLogger.LOGGER.errorConnectingBridge(this, e);
                    }
                } catch (ActiveMQException e2) {
                    if (e2.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) {
                        ActiveMQServerLogger.LOGGER.errorStartingBridge(this.configuration.getName());
                        this.retryCount--;
                        scheduleRetryConnectFixedTimeout(this.configuration.getRetryInterval());
                    } else {
                        ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(this);
                        logger.debug("Underlying bridge connection failure", e2);
                        scheduleRetryConnect();
                    }
                } catch (Exception e3) {
                    ActiveMQServerLogger.LOGGER.errorConnectingBridge(this, e3);
                    if (this.csf != null) {
                        try {
                            this.csf.close();
                            this.csf = null;
                        } catch (Throwable th2) {
                        }
                    }
                    fail(false, false);
                    scheduleRetryConnect();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleRetryConnect() {
        if (this.serverLocator.isClosed()) {
            ActiveMQServerLogger.LOGGER.bridgeLocatorShutdown();
            return;
        }
        if (this.stopping) {
            ActiveMQServerLogger.LOGGER.bridgeStopping();
            return;
        }
        if (this.reconnectAttemptsInUse >= 0 && this.retryCount > this.reconnectAttemptsInUse) {
            ActiveMQServerLogger.LOGGER.bridgeAbortStart(this.configuration.getName(), Integer.valueOf(this.retryCount), Integer.valueOf(this.configuration.getReconnectAttempts()));
            fail(true, false);
            return;
        }
        long retryInterval = (long) (this.configuration.getRetryInterval() * Math.pow(this.configuration.getRetryIntervalMultiplier(), this.retryCount));
        if (retryInterval == 0) {
            retryInterval = this.configuration.getRetryInterval();
        }
        if (retryInterval > this.configuration.getMaxRetryInterval()) {
            retryInterval = this.configuration.getMaxRetryInterval();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Bridge {} retrying connection #{}, maxRetry={}, timeout={}", new Object[]{this, Integer.valueOf(this.retryCount), Integer.valueOf(this.reconnectAttemptsInUse), Long.valueOf(retryInterval)});
        }
        scheduleRetryConnectFixedTimeout(retryInterval);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nodeUP(TopologyMember topologyMember, boolean z) {
        if (topologyMember != null) {
            ClientSessionInternal clientSessionInternal = this.session;
            RemotingConnection connection = clientSessionInternal != null ? clientSessionInternal.getConnection() : null;
            if (this.targetNodeID != null && this.targetNodeID.equals(topologyMember.getNodeId())) {
                this.targetNode = topologyMember;
            } else {
                if (connection == null || !topologyMember.isMember(connection)) {
                    return;
                }
                this.targetNode = topologyMember;
                this.targetNodeID = topologyMember.getNodeId();
            }
        }
    }

    protected void scheduleRetryConnectFixedTimeout(long j) {
        try {
            cleanUpSessionFactory(this.csf);
        } catch (Throwable th) {
        }
        if (this.stopping) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Scheduling retry for bridge {} in {} milliseconds", this.configuration.getName(), Long.valueOf(j));
        }
        this.futureScheduledReconnection = this.scheduledExecutor.schedule(new FutureConnectRunnable(this.executor, this), j, TimeUnit.MILLISECONDS);
    }

    private void internalCancelReferences() {
        cancelRefs();
        if (this.queue != null) {
            this.queue.deliverAsync();
        }
    }

    private synchronized void unsetLargeMessageDelivery() {
        this.deliveringLargeMessage = false;
    }
}
