package org.apache.activemq.artemis.core.server.impl;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
import org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.UnavailableStateException;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.jboss.logging.Logger;

/* loaded from: input_file:artemis-server-2.19.1.jar:org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.class */
public class ReplicationPrimaryActivation extends LiveActivation implements DistributedLock.UnavailableLockListener {
    private static final Logger LOGGER;
    private static final long FAILBACK_TIMEOUT_MILLIS = 4000;
    private final ReplicationPrimaryPolicy policy;
    private final ActiveMQServerImpl activeMQServer;

    @GuardedBy("replicationLock")
    private ReplicationManager replicationManager;
    private final DistributedPrimitiveManager distributedManager;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object replicationLock = new Object();
    private final AtomicBoolean stoppingServer = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.19.1.jar:org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation$ReplicationFailureListener.class */
    public final class ReplicationFailureListener implements FailureListener, CloseListener {
        private ReplicationFailureListener() {
        }

        @Override // org.apache.activemq.artemis.core.remoting.FailureListener
        public void connectionFailed(ActiveMQException activeMQException, boolean z) {
            ReplicationPrimaryActivation.this.onReplicationConnectionClose();
        }

        @Override // org.apache.activemq.artemis.core.remoting.FailureListener
        public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
            connectionFailed(activeMQException, z);
        }

        @Override // org.apache.activemq.artemis.core.remoting.CloseListener
        public void connectionClosed() {
            ReplicationPrimaryActivation.this.onReplicationConnectionClose();
        }
    }

    public ReplicationPrimaryActivation(ActiveMQServerImpl activeMQServerImpl, DistributedPrimitiveManager distributedPrimitiveManager, ReplicationPrimaryPolicy replicationPrimaryPolicy) {
        this.activeMQServer = activeMQServerImpl;
        this.policy = replicationPrimaryPolicy;
        this.distributedManager = distributedPrimitiveManager;
    }

    public DistributedPrimitiveManager getDistributedManager() {
        return this.distributedManager;
    }

    @Override // org.apache.activemq.artemis.core.server.impl.Activation
    public void freezeConnections(RemotingService remotingService) {
        ReplicationManager replicationManager = getReplicationManager();
        if (remotingService != null && replicationManager != null) {
            remotingService.freeze(null, replicationManager.getBackupTransportConnection());
        } else if (remotingService != null) {
            remotingService.freeze(null, null);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        DistributedLock tryActivate;
        try {
            if (this.policy.getCoordinationId() != null) {
                applyCoordinationId(this.policy.getCoordinationId(), this.activeMQServer);
            }
            NodeManager nodeManager = this.activeMQServer.getNodeManager();
            if (nodeManager.getNodeActivationSequence() == -1) {
                nodeManager.writeNodeActivationSequence(0L);
            }
            while (true) {
                this.distributedManager.start();
                try {
                    tryActivate = ActivationSequenceStateMachine.tryActivate(nodeManager, this.distributedManager, LOGGER);
                    break;
                } catch (UnavailableStateException e) {
                    this.distributedManager.stop();
                }
            }
            if (tryActivate == null) {
                this.distributedManager.stop();
                LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeManager.getNodeId());
                this.activeMQServer.setHAPolicy(this.policy.getBackupPolicy());
                return;
            }
            ActivationSequenceStateMachine.ensureSequentialAccessToNodeData(this.activeMQServer.toString(), nodeManager, this.distributedManager, LOGGER);
            this.activeMQServer.initialisePart1(false);
            this.activeMQServer.initialisePart2(false);
            tryActivate.addListener(this);
            if (!tryActivate.isHeldByCaller()) {
                throw new IllegalStateException("This broker isn't live anymore, probably due to application pauses eg GC, OS etc: failing now");
            }
            this.activeMQServer.completeActivation(true);
            if (this.activeMQServer.getIdentity() != null) {
                ActiveMQServerLogger.LOGGER.serverIsLive(this.activeMQServer.getIdentity());
            } else {
                ActiveMQServerLogger.LOGGER.serverIsLive();
            }
        } catch (Exception e2) {
            this.distributedManager.stop();
            ActiveMQServerLogger.LOGGER.initializationError(e2);
            this.activeMQServer.callActivationFailureListeners(e2);
        }
    }

    public static void applyCoordinationId(String str, ActiveMQServerImpl activeMQServerImpl) throws Exception {
        Objects.requireNonNull(str);
        if (!activeMQServerImpl.getNodeManager().isStarted()) {
            throw new IllegalStateException("NodeManager should be started");
        }
        long nodeActivationSequence = activeMQServerImpl.getNodeManager().getNodeActivationSequence();
        LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", str);
        activeMQServerImpl.resetNodeManager();
        NodeManager nodeManager = activeMQServerImpl.getNodeManager();
        nodeManager.start();
        nodeManager.setNodeID(str);
        nodeManager.stopBackup();
        long readNodeActivationSequence = nodeManager.readNodeActivationSequence();
        if (!$assertionsDisabled && readNodeActivationSequence != nodeActivationSequence) {
            throw new AssertionError();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.impl.Activation
    public ChannelHandler getActivationChannelHandler(Channel channel, Acceptor acceptor) {
        if (this.stoppingServer.get()) {
            return null;
        }
        return packet -> {
            if (packet.getType() == 115) {
                onBackupRegistration(channel, acceptor, (BackupRegistrationMessage) packet);
            }
        };
    }

    private void onBackupRegistration(Channel channel, Acceptor acceptor, BackupRegistrationMessage backupRegistrationMessage) {
        try {
            startAsyncReplication(channel.getConnection(), acceptor.getClusterConnection(), backupRegistrationMessage.getConnector(), backupRegistrationMessage.isFailBackRequest());
        } catch (ActiveMQAlreadyReplicatingException e) {
            channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
        } catch (ActiveMQException e2) {
            LOGGER.debug("Failed to process backup registration packet", e2);
            channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
        }
    }

    private void startAsyncReplication(CoreRemotingConnection coreRemotingConnection, ClusterConnection clusterConnection, TransportConfiguration transportConfiguration, boolean z) throws ActiveMQException {
        synchronized (this.replicationLock) {
            if (this.replicationManager != null) {
                throw new ActiveMQAlreadyReplicatingException();
            }
            if (!this.activeMQServer.isStarted()) {
                throw new ActiveMQIllegalStateException();
            }
            ReplicationFailureListener replicationFailureListener = new ReplicationFailureListener();
            coreRemotingConnection.addCloseListener(replicationFailureListener);
            coreRemotingConnection.addFailureListener(replicationFailureListener);
            ReplicationManager replicationManager = new ReplicationManager(this.activeMQServer, coreRemotingConnection, clusterConnection.getCallTimeout(), this.policy.getInitialReplicationSyncTimeout(), this.activeMQServer.getIOExecutorFactory());
            this.replicationManager = replicationManager;
            replicationManager.start();
            Thread thread = new Thread(() -> {
                replicate(replicationManager, clusterConnection, z, transportConfiguration);
            });
            thread.setName("async-replication-thread");
            thread.start();
        }
    }

    private void replicate(ReplicationManager replicationManager, ClusterConnection clusterConnection, boolean z, TransportConfiguration transportConfiguration) {
        try {
            String simpleString = this.activeMQServer.getNodeID().toString();
            this.activeMQServer.getStorageManager().startReplication(replicationManager, this.activeMQServer.getPagingManager(), simpleString, z && this.policy.isAllowAutoFailBack(), this.policy.getInitialReplicationSyncTimeout());
            clusterConnection.nodeAnnounced(System.currentTimeMillis(), simpleString, this.policy.getGroupName(), this.policy.getScaleDownGroupName(), new Pair<>(null, transportConfiguration), true);
            if (z && this.policy.isAllowAutoFailBack()) {
                awaitBackupAnnouncementOnFailbackRequest(clusterConnection);
            }
        } catch (Exception e) {
            if (this.activeMQServer.getState() == ActiveMQServer.SERVER_STATE.STARTED) {
                ActiveMQServerLogger.LOGGER.errorStartingReplication(e);
            }
            try {
                try {
                    ActiveMQServerImpl.stopComponent(replicationManager);
                    synchronized (this.replicationLock) {
                        if (this.replicationManager == replicationManager) {
                            this.replicationManager = null;
                        }
                    }
                } catch (Exception e2) {
                    ActiveMQServerLogger.LOGGER.errorStoppingReplication(e2);
                    synchronized (this.replicationLock) {
                        if (this.replicationManager == replicationManager) {
                            this.replicationManager = null;
                        }
                    }
                }
            } catch (Throwable th) {
                synchronized (this.replicationLock) {
                    if (this.replicationManager == replicationManager) {
                        this.replicationManager = null;
                    }
                    throw th;
                }
            }
        }
    }

    private void awaitBackupAnnouncementOnFailbackRequest(ClusterConnection clusterConnection) throws Exception {
        BackupTopologyListener backupTopologyListener = new BackupTopologyListener(this.activeMQServer.getNodeID().toString(), clusterConnection.getConnector());
        clusterConnection.addClusterTopologyListener(backupTopologyListener);
        try {
            if (backupTopologyListener.waitForBackup()) {
                restartAsBackupAfterFailback();
            } else {
                ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
            }
        } finally {
            clusterConnection.removeClusterTopologyListener(backupTopologyListener);
        }
    }

    private void restartAsBackupAfterFailback() throws Exception {
        if (this.stoppingServer.get()) {
            return;
        }
        synchronized (this.replicationLock) {
            if (this.stoppingServer.get()) {
                return;
            }
            if (this.replicationManager == null) {
                LOGGER.warnf("Failback interrupted", new Object[0]);
                return;
            }
            if (!this.stoppingServer.compareAndSet(false, true)) {
                LOGGER.infof("Failback interrupted: server is already stopping", new Object[0]);
                return;
            }
            String simpleString = this.activeMQServer.getNodeManager().getNodeId().toString();
            long nodeActivationSequence = this.activeMQServer.getNodeManager().getNodeActivationSequence();
            this.activeMQServer.fail(true);
            try {
                try {
                    this.distributedManager.start();
                    if (!ActivationSequenceStateMachine.awaitNextCommittedActivationSequence(this.distributedManager, simpleString, nodeActivationSequence, FAILBACK_TIMEOUT_MILLIS, LOGGER)) {
                        LOGGER.warnf("Timed out waiting for failback server activation with NodeID = %s: and sequence > %d: after %dms", simpleString, Long.valueOf(nodeActivationSequence), Long.valueOf(FAILBACK_TIMEOUT_MILLIS));
                    }
                } catch (UnavailableStateException e) {
                    LOGGER.debug("Unavailable distributed manager while awaiting failback activation sequence: ignored", e);
                    this.distributedManager.stop();
                }
                ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
                this.activeMQServer.setHAPolicy(this.policy.getBackupPolicy());
                this.activeMQServer.start();
            } finally {
                this.distributedManager.stop();
            }
        }
    }

    private void asyncStopServer() {
        if (!this.stoppingServer.get() && this.stoppingServer.compareAndSet(false, true)) {
            new Thread(() -> {
                try {
                    this.activeMQServer.stop();
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, this.activeMQServer);
                }
            }).start();
        }
    }

    public void onUnavailableLockEvent() {
        LOGGER.error("Quorum UNAVAILABLE: async stopping broker.");
        asyncStopServer();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReplicationConnectionClose() {
        ExecutorService threadPool = this.activeMQServer.getThreadPool();
        if (threadPool == null || this.stoppingServer.get()) {
            return;
        }
        threadPool.execute(() -> {
            synchronized (this.replicationLock) {
                if (this.replicationManager == null) {
                    return;
                }
                if (!this.stoppingServer.get() && ActiveMQServer.SERVER_STATE.STARTED.equals(this.activeMQServer.getState())) {
                    try {
                        ActivationSequenceStateMachine.ensureSequentialAccessToNodeData(this.activeMQServer.toString(), this.activeMQServer.getNodeManager(), this.distributedManager, LOGGER);
                    } catch (Throwable th) {
                        LOGGER.errorf(th, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", th.getLocalizedMessage());
                        asyncStopServer();
                    }
                }
                StorageManager storageManager = this.activeMQServer.getStorageManager();
                if (storageManager != null) {
                    storageManager.stopReplication();
                }
                this.replicationManager = null;
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.server.impl.Activation
    public void close(boolean z, boolean z2) throws Exception {
        synchronized (this.replicationLock) {
            this.replicationManager = null;
        }
        this.distributedManager.stop();
        NodeManager nodeManager = this.activeMQServer.getNodeManager();
        if (nodeManager != null) {
            if (z) {
                nodeManager.crashLiveServer();
            } else {
                nodeManager.pauseLiveServer();
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.impl.Activation
    public void sendLiveIsStopping() {
        ReplicationManager replicationManager = getReplicationManager();
        if (replicationManager == null) {
            return;
        }
        replicationManager.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED);
        ScheduledExecutorService scheduledPool = this.activeMQServer.getScheduledPool();
        replicationManager.getClass();
        scheduledPool.schedule(replicationManager::clearReplicationTokens, 30L, TimeUnit.SECONDS);
    }

    @Override // org.apache.activemq.artemis.core.server.impl.Activation
    public ReplicationManager getReplicationManager() {
        ReplicationManager replicationManager;
        synchronized (this.replicationLock) {
            replicationManager = this.replicationManager;
        }
        return replicationManager;
    }

    @Override // org.apache.activemq.artemis.core.server.impl.Activation
    public boolean isReplicaSync() {
        ReplicationManager replicationManager = getReplicationManager();
        return (replicationManager == null || replicationManager.isSynchronizing()) ? false : true;
    }

    static {
        $assertionsDisabled = !ReplicationPrimaryActivation.class.desiredAssertionStatus();
        LOGGER = Logger.getLogger(ReplicationPrimaryActivation.class);
    }
}
