package org.apache.activemq.artemis.core.replication;

import io.netty.handler.codec.http2.Http2CodecUtil;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.jboss.logging.Logger;

/* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/replication/ReplicationManager.class */
public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
    private static final Logger logger = Logger.getLogger((Class<?>) ReplicationManager.class);
    private final Channel replicatingChannel;
    private boolean started;
    private volatile boolean enabled;
    private final ExecutorFactory executorFactory;
    private SessionFailureListener failureListener;
    private CoreRemotingConnection remotingConnection;
    private final long timeout;
    private final ResponseHandler responseHandler = new ResponseHandler();
    private final AtomicBoolean writable = new AtomicBoolean(true);
    private final Object replicationLock = new Object();
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue();
    private volatile boolean inSync = true;
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);

    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/replication/ReplicationManager$ADD_OPERATION_TYPE.class */
    public enum ADD_OPERATION_TYPE {
        UPDATE { // from class: org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE.1
            @Override // org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE
            public boolean toBoolean() {
                return true;
            }
        },
        ADD { // from class: org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE.2
            @Override // org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE
            public boolean toBoolean() {
                return false;
            }
        };

        public abstract boolean toBoolean();

        public static ADD_OPERATION_TYPE toOperation(boolean z) {
            return z ? UPDATE : ADD;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/replication/ReplicationManager$NullEncoding.class */
    private static final class NullEncoding implements EncodingSupport {
        static final NullEncoding instance = new NullEncoding();

        private NullEncoding() {
        }

        @Override // org.apache.activemq.artemis.core.journal.EncodingSupport
        public void decode(ActiveMQBuffer activeMQBuffer) {
        }

        @Override // org.apache.activemq.artemis.core.journal.EncodingSupport
        public void encode(ActiveMQBuffer activeMQBuffer) {
        }

        @Override // org.apache.activemq.artemis.core.journal.EncodingSupport
        public int getEncodeSize() {
            return 0;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/replication/ReplicationManager$ReplicatedSessionFailureListener.class */
    private final class ReplicatedSessionFailureListener implements SessionFailureListener {
        private ReplicatedSessionFailureListener() {
        }

        @Override // org.apache.activemq.artemis.core.remoting.FailureListener
        public void connectionFailed(ActiveMQException activeMQException, boolean z) {
            if (activeMQException.getType() == ActiveMQExceptionType.DISCONNECTED) {
                ActiveMQServerLogger.LOGGER.replicationStopOnBackupShutdown();
            } else {
                ActiveMQServerLogger.LOGGER.replicationStopOnBackupFail(activeMQException);
            }
            try {
                ReplicationManager.this.stop();
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingReplication(e);
            }
        }

        @Override // org.apache.activemq.artemis.core.remoting.FailureListener
        public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
            connectionFailed(activeMQException, z);
        }

        @Override // org.apache.activemq.artemis.api.core.client.SessionFailureListener
        public void beforeReconnect(ActiveMQException activeMQException) {
        }
    }

    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/replication/ReplicationManager$ResponseHandler.class */
    private final class ResponseHandler implements ChannelHandler {
        private ResponseHandler() {
        }

        @Override // org.apache.activemq.artemis.core.protocol.core.ChannelHandler
        public void handlePacket(Packet packet) {
            if (packet.getType() == 90 || packet.getType() == -9) {
                ReplicationManager.this.replicated();
                if (packet.getType() == -9 && ((ReplicationResponseMessageV2) packet).isSynchronizationIsFinishedAcknowledgement()) {
                    ReplicationManager.this.synchronizationIsFinishedAcknowledgement.countDown();
                }
            }
        }
    }

    public ReplicationManager(CoreRemotingConnection coreRemotingConnection, long j, ExecutorFactory executorFactory) {
        this.executorFactory = executorFactory;
        this.replicatingChannel = coreRemotingConnection.getChannel(ChannelImpl.CHANNEL_ID.REPLICATION.id, -1);
        this.remotingConnection = coreRemotingConnection;
        this.timeout = j;
    }

    public void appendUpdateRecord(byte b, ADD_OPERATION_TYPE add_operation_type, long j, byte b2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddMessage(b, add_operation_type, j, b2, encodingSupport));
        }
    }

    public void appendDeleteRecord(byte b, long j) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteMessage(b, j));
        }
    }

    public void appendAddRecordTransactional(byte b, ADD_OPERATION_TYPE add_operation_type, long j, long j2, byte b2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddTXMessage(b, add_operation_type, j, j2, b2, encodingSupport));
        }
    }

    public void appendCommitRecord(byte b, long j, boolean z, boolean z2) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationCommitMessage(b, false, j), z2);
        }
    }

    public void appendDeleteRecordTransactional(byte b, long j, long j2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteTXMessage(b, j, j2, encodingSupport));
        }
    }

    public void appendDeleteRecordTransactional(byte b, long j, long j2) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteTXMessage(b, j, j2, NullEncoding.instance));
        }
    }

    public void appendPrepareRecord(byte b, long j, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPrepareMessage(b, j, encodingSupport));
        }
    }

    public void appendRollbackRecord(byte b, long j) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationCommitMessage(b, true, j));
        }
    }

    public void pageClosed(SimpleString simpleString, int i) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageEventMessage(simpleString, i, false));
        }
    }

    public void pageDeleted(SimpleString simpleString, int i) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageEventMessage(simpleString, i, true));
        }
    }

    public void pageWrite(PagedMessage pagedMessage, int i) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageWriteMessage(pagedMessage, i));
        }
    }

    public void largeMessageBegin(long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageBeginMessage(j));
        }
    }

    public void largeMessageDelete(Long l) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageEndMessage(l.longValue()));
        }
    }

    public void largeMessageWrite(long j, byte[] bArr) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageWriteMessage(j, bArr));
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized void start() throws ActiveMQException {
        if (this.started) {
            throw new IllegalStateException("ReplicationManager is already started");
        }
        this.replicatingChannel.setHandler(this.responseHandler);
        this.failureListener = new ReplicatedSessionFailureListener();
        this.remotingConnection.addFailureListener(this.failureListener);
        this.started = true;
        this.enabled = true;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public void stop() throws Exception {
        synchronized (this) {
            if (!this.started) {
                logger.trace("Stopping being ignored as it hasn't been started");
                return;
            }
            if (this.replicatingChannel != null) {
                this.replicatingChannel.close();
                this.replicatingChannel.getConnection().getTransportConnection().fireReady(true);
            }
            synchronized (this.replicationLock) {
                this.enabled = false;
                this.writable.set(true);
                this.replicationLock.notifyAll();
                clearReplicationTokens();
            }
            CoreRemotingConnection coreRemotingConnection = this.remotingConnection;
            if (coreRemotingConnection != null) {
                coreRemotingConnection.removeFailureListener(this.failureListener);
            }
            this.remotingConnection = null;
            this.started = false;
        }
    }

    public void clearReplicationTokens() {
        logger.trace("clearReplicationTokens initiating");
        synchronized (this.replicationLock) {
            logger.trace("clearReplicationTokens entered the lock");
            while (!this.pendingTokens.isEmpty()) {
                OperationContext poll = this.pendingTokens.poll();
                logger.trace("Calling ctx.replicationDone()");
                try {
                    poll.replicationDone();
                } catch (Throwable th) {
                    ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(th);
                }
            }
        }
        logger.trace("clearReplicationTokens finished");
    }

    public Set<OperationContext> getActiveTokens() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        Iterator<OperationContext> it = this.pendingTokens.iterator();
        while (it.hasNext()) {
            linkedHashSet.add(it.next());
        }
        return linkedHashSet;
    }

    private OperationContext sendReplicatePacket(Packet packet) {
        return sendReplicatePacket(packet, true);
    }

    private OperationContext sendReplicatePacket(Packet packet, boolean z) {
        if (!this.enabled) {
            return null;
        }
        boolean z2 = false;
        OperationContext context = OperationContextImpl.getContext(this.executorFactory);
        if (z) {
            context.replicationLineUp();
        }
        synchronized (this.replicationLock) {
            if (this.enabled) {
                this.pendingTokens.add(context);
                if (!flowControl()) {
                    return context;
                }
                this.replicatingChannel.send(packet);
            } else {
                z2 = true;
            }
            if (z2) {
                context.replicationDone();
            }
            return context;
        }
    }

    private boolean flowControl() {
        if (this.replicatingChannel.getConnection().isWritable(this)) {
            return true;
        }
        try {
            logger.trace("flowControl waiting on writable");
            this.writable.set(false);
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + this.timeout;
            while (!this.writable.get() && currentTimeMillis < j) {
                this.replicationLock.wait(j - currentTimeMillis);
                currentTimeMillis = System.currentTimeMillis();
            }
            logger.trace("flow control done");
            if (this.writable.get()) {
                return true;
            }
            ActiveMQServerLogger.LOGGER.slowReplicationResponse();
            logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - currentTimeMillis);
            try {
                stop();
                return false;
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
                return false;
            }
        } catch (InterruptedException e2) {
            throw new ActiveMQInterruptedException(e2);
        }
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.ReadyListener
    public void readyForWriting() {
        synchronized (this.replicationLock) {
            this.writable.set(true);
            this.replicationLock.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void replicated() {
        OperationContext poll = this.pendingTokens.poll();
        if (poll == null) {
            throw new IllegalStateException("Missing replication token on the queue.");
        }
        poll.replicationDone();
    }

    public void syncJournalFile(JournalFile journalFile, AbstractJournalStorageManager.JournalContent journalContent) throws Exception {
        if (this.enabled) {
            SequentialFile cloneFile = journalFile.getFile().cloneFile();
            try {
                ActiveMQServerLogger.LOGGER.journalSynch(journalFile, Long.valueOf(cloneFile.size()), cloneFile);
                sendLargeFile(journalContent, null, journalFile.getFileID(), cloneFile, Http2CodecUtil.MAX_HEADER_LIST_SIZE);
                if (cloneFile.isOpen()) {
                    cloneFile.close();
                }
            } catch (Throwable th) {
                if (cloneFile.isOpen()) {
                    cloneFile.close();
                }
                throw th;
            }
        }
    }

    public void syncLargeMessageFile(SequentialFile sequentialFile, long j, long j2) throws Exception {
        if (this.enabled) {
            sendLargeFile(null, null, j2, sequentialFile, j);
        }
    }

    public void syncPages(SequentialFile sequentialFile, long j, SimpleString simpleString) throws Exception {
        if (this.enabled) {
            sendLargeFile(null, simpleString, j, sequentialFile, Http2CodecUtil.MAX_HEADER_LIST_SIZE);
        }
    }

    /* JADX WARN: Finally extract failed */
    private void sendLargeFile(AbstractJournalStorageManager.JournalContent journalContent, SimpleString simpleString, long j, SequentialFile sequentialFile, long j2) throws Exception {
        if (this.enabled) {
            if (!sequentialFile.isOpen()) {
                sequentialFile.open();
            }
            try {
                FileInputStream fileInputStream = new FileInputStream(sequentialFile.getJavaFile());
                Throwable th = null;
                try {
                    FileChannel channel = fileInputStream.getChannel();
                    Throwable th2 = null;
                    try {
                        try {
                            ByteBuffer allocate = ByteBuffer.allocate(131072);
                            do {
                                allocate.clear();
                                int read = channel.read(allocate);
                                int i = read;
                                if (read > 0) {
                                    if (read >= j2) {
                                        i = (int) j2;
                                        j2 = 0;
                                    } else {
                                        j2 -= read;
                                    }
                                    allocate.limit(i);
                                }
                                allocate.rewind();
                                sendReplicatePacket(new ReplicationSyncFileMessage(journalContent, simpleString, j, i, allocate));
                                if (read == -1 || read == 0) {
                                    break;
                                }
                            } while (j2 != 0);
                            if (channel != null) {
                                if (0 != 0) {
                                    try {
                                        channel.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    channel.close();
                                }
                            }
                            if (fileInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        fileInputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    fileInputStream.close();
                                }
                            }
                        } catch (Throwable th5) {
                            th2 = th5;
                            throw th5;
                        }
                    } catch (Throwable th6) {
                        if (channel != null) {
                            if (th2 != null) {
                                try {
                                    channel.close();
                                } catch (Throwable th7) {
                                    th2.addSuppressed(th7);
                                }
                            } else {
                                channel.close();
                            }
                        }
                        throw th6;
                    }
                } catch (Throwable th8) {
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    throw th8;
                }
            } finally {
                if (sequentialFile.isOpen()) {
                    sequentialFile.close();
                }
            }
        }
    }

    public void sendStartSyncMessage(JournalFile[] journalFileArr, AbstractJournalStorageManager.JournalContent journalContent, String str, boolean z) throws ActiveMQException {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationStartSyncMessage(journalFileArr, journalContent, str, z));
        }
    }

    public void sendSynchronizationDone(String str, long j) {
        if (this.enabled) {
            if (logger.isTraceEnabled()) {
                logger.trace("sendSynchronizationDone ::" + str + ", " + j);
            }
            this.synchronizationIsFinishedAcknowledgement.countUp();
            sendReplicatePacket(new ReplicationStartSyncMessage(str));
            try {
            } catch (InterruptedException e) {
                logger.debug(e);
            }
            if (!this.synchronizationIsFinishedAcknowledgement.await(j)) {
                logger.trace("sendSynchronizationDone wasn't finished in time");
                throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(j);
            }
            this.inSync = false;
            logger.trace("sendSynchronizationDone finished");
        }
    }

    public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> map) {
        ArrayList arrayList = new ArrayList(map.keySet());
        if (this.enabled) {
            sendReplicatePacket(new ReplicationStartSyncMessage(arrayList));
        }
    }

    public OperationContext sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping liveStopping) {
        logger.debug("LIVE IS STOPPING?!? message=" + liveStopping + " enabled=" + this.enabled);
        if (!this.enabled) {
            return null;
        }
        logger.debug("LIVE IS STOPPING?!? message=" + liveStopping + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + this.enabled);
        return sendReplicatePacket(new ReplicationLiveIsStoppingMessage(liveStopping));
    }

    public CoreRemotingConnection getBackupTransportConnection() {
        return this.remotingConnection;
    }

    public boolean isSynchronizing() {
        return this.inSync;
    }
}
