/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.replication;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.impl.FileWrapperJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.jboss.logging.Logger;

public final class ReplicationEndpoint
implements ChannelHandler,
ActiveMQComponent {
    private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class);
    private final ActiveMQServerImpl server;
    private final boolean wantedFailBack;
    private final ReplicationEndpointEventListener eventListener;
    private final boolean noSync = false;
    private Channel channel;
    private boolean supportResponseBatching;
    private Journal[] journals;
    private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
    private final Map<AbstractJournalStorageManager.JournalContent, Map<Long, JournalSyncFile>> filesReservedForSync = new HashMap<AbstractJournalStorageManager.JournalContent, Map<Long, JournalSyncFile>>();
    private Map<AbstractJournalStorageManager.JournalContent, Journal> journalsHolder = new HashMap<AbstractJournalStorageManager.JournalContent, Journal>();
    private StorageManager storageManager;
    private PagingManager pageManager;
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
    private final ConcurrentMap<Long, ReplicatedLargeMessage> largeMessages = new ConcurrentHashMap<Long, ReplicatedLargeMessage>();
    private boolean deletePages = true;
    private volatile boolean started;
    private Executor executor;
    private List<Interceptor> outgoingInterceptors = null;
    private final ArrayDeque<Packet> pendingPackets;

    public ReplicationEndpoint(ActiveMQServerImpl server, boolean wantedFailBack, ReplicationEndpointEventListener eventListener) {
        this.server = server;
        this.wantedFailBack = wantedFailBack;
        this.eventListener = eventListener;
        this.pendingPackets = new ArrayDeque();
        this.supportResponseBatching = false;
    }

    public synchronized void registerJournal(byte id, Journal journal) {
        if (this.journals == null || id >= this.journals.length) {
            Journal[] oldJournals = this.journals;
            this.journals = new Journal[id + 1];
            if (oldJournals != null) {
                System.arraycopy(oldJournals, 0, this.journals, 0, oldJournals.length);
            }
        }
        this.journals[id] = journal;
    }

    public void addOutgoingInterceptorForReplication(Interceptor interceptor) {
        if (this.outgoingInterceptors == null) {
            this.outgoingInterceptors = new CopyOnWriteArrayList<Interceptor>();
        }
        this.outgoingInterceptors.add(interceptor);
    }

    public void pause() {
        this.started = false;
    }

    public void resume() {
        this.started = true;
    }

    @Override
    public void handlePacket(Packet packet) {
        if (logger.isTraceEnabled()) {
            logger.trace("handlePacket::handling " + packet);
        }
        PacketImpl response = new ReplicationResponseMessage();
        byte type = packet.getType();
        try {
            if (!this.started) {
                if (logger.isTraceEnabled()) {
                    logger.trace("handlePacket::ignoring " + packet);
                }
                return;
            }
            if (type == 91) {
                this.handleAppendAddRecord((ReplicationAddMessage)packet);
            } else if (type == 92) {
                this.handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
            } else if (type == 93) {
                this.handleAppendDelete((ReplicationDeleteMessage)packet);
            } else if (type == 94) {
                this.handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
            } else if (type == 95) {
                this.handlePrepare((ReplicationPrepareMessage)packet);
            } else if (type == 96) {
                this.handleCommitRollback((ReplicationCommitMessage)packet);
            } else if (type == 97) {
                this.handlePageWrite((ReplicationPageWriteMessage)packet);
            } else if (type == 98) {
                this.handlePageEvent((ReplicationPageEventMessage)packet);
            } else if (type == 99) {
                this.handleLargeMessageBegin((ReplicationLargeMessageBeginMessage)packet);
            } else if (type == 101) {
                this.handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
            } else if (type == 100) {
                this.handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
            } else if (type == 120) {
                response = this.handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
            } else if (type == 103) {
                this.handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
            } else if (type == 121) {
                this.handleLiveStopping((ReplicationLiveIsStoppingMessage)packet);
            } else if (type == 116) {
                this.handleFatalError((BackupReplicationStartFailedMessage)packet);
            } else {
                ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet);
            }
        }
        catch (ActiveMQException e) {
            logger.warn((Object)e.getMessage(), e);
            ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
            response = new ActiveMQExceptionMessage(e);
        }
        catch (Exception e) {
            logger.warn((Object)e.getMessage(), e);
            ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
            response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
        }
        if (response != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Returning " + response);
            }
            if (this.supportResponseBatching) {
                this.pendingPackets.add(response);
            } else {
                this.channel.send(response);
            }
        } else {
            logger.trace("Response is null, ignoring response");
        }
    }

    @Override
    public void endOfBatch() {
        ArrayDeque<Packet> pendingPackets = this.pendingPackets;
        if (pendingPackets.isEmpty()) {
            return;
        }
        int size = pendingPackets.size();
        for (int i2 = 0; i2 < size; ++i2) {
            Packet packet = pendingPackets.poll();
            boolean isLast = i2 == size - 1;
            this.channel.send(packet, isLast);
        }
    }

    private void handleFatalError(BackupReplicationStartFailedMessage packet) {
        ActiveMQServerLogger.LOGGER.errorStartingReplication(packet.getRegistrationProblem());
        this.server.stopTheServer(false);
    }

    private void handleLiveStopping(ReplicationLiveIsStoppingMessage packet) throws ActiveMQException {
        this.eventListener.onLiveStopping(packet.isFinalMessage());
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public synchronized void start() throws Exception {
        block3: {
            Configuration config = this.server.getConfiguration();
            try {
                this.storageManager = this.server.getStorageManager();
                this.storageManager.start();
                this.server.getManagementService().setStorageManager(this.storageManager);
                this.journalsHolder.put(AbstractJournalStorageManager.JournalContent.BINDINGS, this.storageManager.getBindingsJournal());
                this.journalsHolder.put(AbstractJournalStorageManager.JournalContent.MESSAGES, this.storageManager.getMessageJournal());
                for (AbstractJournalStorageManager.JournalContent jc : EnumSet.allOf(AbstractJournalStorageManager.JournalContent.class)) {
                    this.filesReservedForSync.put(jc, new HashMap());
                    this.journalLoadInformation[jc.typeByte] = this.journalsHolder.get((Object)jc).loadSyncOnly(Journal.JournalState.SYNCING);
                }
                this.pageManager = this.server.createPagingManager();
                this.pageManager.start();
                this.started = true;
            }
            catch (Exception e) {
                if (!this.server.isStarted()) break block3;
                throw e;
            }
        }
    }

    @Override
    public synchronized void stop() throws Exception {
        if (!this.started) {
            return;
        }
        logger.trace("Stopping endpoint");
        this.started = false;
        OrderedExecutorFactory.flushExecutor(this.executor);
        if (this.channel != null) {
            this.channel.close();
        }
        for (ReplicatedLargeMessage replicatedLargeMessage : this.largeMessages.values()) {
            replicatedLargeMessage.releaseResources(true, false);
        }
        this.largeMessages.clear();
        for (Map.Entry entry : this.filesReservedForSync.entrySet()) {
            for (JournalSyncFile journalSyncFile : ((Map)entry.getValue()).values()) {
                journalSyncFile.close();
            }
        }
        this.filesReservedForSync.clear();
        if (this.journals != null) {
            for (Iterator<Object> iterator2 : this.journals) {
                if (!(iterator2 instanceof FileWrapperJournal)) continue;
                iterator2.stop();
            }
        }
        for (ConcurrentMap concurrentMap : this.pageIndex.values()) {
            for (Page page : concurrentMap.values()) {
                try {
                    page.close(false);
                }
                catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorClosingPageOnReplication(e);
                }
            }
        }
        this.pageManager.stop();
        this.pageIndex.clear();
        this.storageManager.stop();
        this.started = false;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
        if (channel == null) {
            this.supportResponseBatching = false;
        } else {
            try {
                CoreRemotingConnection connection = channel.getConnection();
                this.supportResponseBatching = connection != null ? connection.getTransportConnection() instanceof NettyConnection : false;
            }
            catch (Throwable t) {
                logger.warn((Object)"Error while checking the channel connection", t);
                this.supportResponseBatching = false;
            }
        }
        if (channel != null && this.outgoingInterceptors != null && channel.getConnection() instanceof RemotingConnectionImpl) {
            try {
                RemotingConnectionImpl impl = (RemotingConnectionImpl)channel.getConnection();
                for (Interceptor interceptor : this.outgoingInterceptors) {
                    impl.getOutgoingInterceptors().add(interceptor);
                }
            }
            catch (Throwable e) {
                logger.warn((Object)e.getMessage(), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void finishSynchronization(String liveID, long activationSequence) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID + " activationSequence = " + activationSequence);
        }
        for (AbstractJournalStorageManager.JournalContent jc : EnumSet.allOf(AbstractJournalStorageManager.JournalContent.class)) {
            Journal journal = this.journalsHolder.remove((Object)jc);
            if (logger.isTraceEnabled()) {
                logger.trace("getting lock on " + (Object)((Object)jc) + ", journal = " + journal);
            }
            this.registerJournal(jc.typeByte, journal);
            journal.synchronizationLock();
            try {
                if (logger.isTraceEnabled()) {
                    logger.trace("lock acquired on " + (Object)((Object)jc));
                }
                this.filesReservedForSync.remove((Object)jc);
                if (logger.isTraceEnabled()) {
                    logger.trace("stopping journal for " + (Object)((Object)jc));
                }
                journal.stop();
                if (logger.isTraceEnabled()) {
                    logger.trace("starting journal for " + (Object)((Object)jc));
                }
                journal.start();
                if (logger.isTraceEnabled()) {
                    logger.trace("loadAndSync " + (Object)((Object)jc));
                }
                journal.loadSyncOnly(Journal.JournalState.SYNCING_UP_TO_DATE);
            }
            finally {
                if (logger.isTraceEnabled()) {
                    logger.trace("unlocking " + (Object)((Object)jc));
                }
                journal.synchronizationUnlock();
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Sync on large messages...");
        }
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        for (Map.Entry entry : this.largeMessages.entrySet()) {
            ReplicatedLargeMessage lm = (ReplicatedLargeMessage)entry.getValue();
            if (!(lm instanceof LargeServerMessageInSync)) continue;
            LargeServerMessageInSync lmSync = (LargeServerMessageInSync)lm;
            if (logger.isTraceEnabled()) {
                logger.trace("lmSync on " + lmSync.toString());
            }
            lmSync.joinSyncedData(buffer);
        }
        if (logger.isTraceEnabled()) {
            logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
        }
        this.journalsHolder = null;
        this.eventListener.onRemoteBackupUpToDate(liveID, activationSequence);
        if (logger.isTraceEnabled()) {
            logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
        }
        ActiveMQServerLogger.LOGGER.backupServerSynchronized(this.server, liveID);
    }

    private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
        SequentialFile channel1;
        long id = msg.getId();
        byte[] data = msg.getData();
        switch (msg.getFileType()) {
            case LARGE_MESSAGE: {
                ReplicatedLargeMessage largeMessage = this.lookupLargeMessage(id, false, false);
                if (!(largeMessage instanceof LargeServerMessageInSync)) {
                    ActiveMQServerLogger.LOGGER.largeMessageIncompatible();
                    return;
                }
                LargeServerMessageInSync largeMessageInSync = (LargeServerMessageInSync)largeMessage;
                channel1 = largeMessageInSync.getSyncFile();
                break;
            }
            case PAGE: {
                Page page = this.getPage(msg.getPageStore(), (int)msg.getId());
                channel1 = page.getFile();
                break;
            }
            case JOURNAL: {
                JournalSyncFile journalSyncFile = this.filesReservedForSync.get((Object)msg.getJournalContent()).get(id);
                FileChannel channel2 = journalSyncFile.getChannel();
                if (data == null) {
                    channel2.close();
                    return;
                }
                channel2.write(ByteBuffer.wrap(data));
                return;
            }
            default: {
                throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledFileType(msg.getFileType());
            }
        }
        if (data == null) {
            if (channel1.isOpen()) {
                channel1.close();
            }
            return;
        }
        if (!channel1.isOpen()) {
            channel1.open();
        }
        channel1.writeDirect(ByteBuffer.wrap(data), false);
    }

    private ReplicationResponseMessageV2 handleStartReplicationSynchronization(ReplicationStartSyncMessage packet) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
        }
        ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
        if (!this.started) {
            return replicationResponseMessage;
        }
        if (packet.isSynchronizationFinished()) {
            long activationSequence = 0L;
            if (packet.getFileIds() != null && packet.getFileIds().length == 1) {
                activationSequence = packet.getFileIds()[0];
            }
            this.finishSynchronization(packet.getNodeID(), activationSequence);
            replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
            return replicationResponseMessage;
        }
        switch (packet.getDataType()) {
            case LargeMessages: {
                for (long msgID : packet.getFileIds()) {
                    this.createLargeMessage(msgID, true);
                }
                break;
            }
            case JournalBindings: 
            case JournalMessages: {
                if (this.wantedFailBack && !packet.isServerToFailBack()) {
                    ActiveMQServerLogger.LOGGER.autoFailBackDenied();
                }
                AbstractJournalStorageManager.JournalContent journalContent = ReplicationStartSyncMessage.SyncDataType.getJournalContentType(packet.getDataType());
                Journal journal = this.journalsHolder.get((Object)journalContent);
                Map<Long, JournalSyncFile> mapToFill = this.filesReservedForSync.get((Object)journalContent);
                for (Map.Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
                    mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
                }
                FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
                this.registerJournal(journalContent.typeByte, syncJournal);
                if (this.supportResponseBatching) {
                    this.endOfBatch();
                }
                this.channel.send(replicationResponseMessage);
                replicationResponseMessage = null;
                if (packet.getNodeID() == null) break;
                this.eventListener.onLiveNodeId(packet.getNodeID());
                break;
            }
            default: {
                throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
            }
        }
        return replicationResponseMessage;
    }

    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
        ReplicatedLargeMessage message;
        if (logger.isTraceEnabled()) {
            logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
        }
        if ((message = this.lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false)) != null) {
            message.setPendingRecordID(packet.getPendingRecordId());
            if (!packet.isDelete()) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Closing LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
                }
                message.releaseResources(true, false);
            } else {
                this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            if (logger.isTraceEnabled()) {
                                logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
                            }
                            message.deleteFile();
                        }
                        catch (Exception e) {
                            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
                        }
                    }
                });
            }
        }
    }

    private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage packet) throws Exception {
        ReplicatedLargeMessage message = this.lookupLargeMessage(packet.getMessageId(), false, true);
        if (message != null) {
            message.addBytes(packet.getBody());
        }
    }

    private ReplicatedLargeMessage lookupLargeMessage(long messageId, boolean delete, boolean createIfNotExists) {
        ReplicatedLargeMessage message;
        if (delete) {
            message = (ReplicatedLargeMessage)this.largeMessages.remove(messageId);
        } else {
            message = (ReplicatedLargeMessage)this.largeMessages.get(messageId);
            if (message == null) {
                if (createIfNotExists) {
                    this.createLargeMessage(messageId, false);
                    message = (ReplicatedLargeMessage)this.largeMessages.get(messageId);
                } else {
                    ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
                }
            }
        }
        return message;
    }

    private void handleLargeMessageBegin(ReplicationLargeMessageBeginMessage packet) {
        long id = packet.getMessageId();
        this.createLargeMessage(id, false);
        if (logger.isTraceEnabled()) {
            logger.trace("Receiving Large Message Begin " + id + " on backup");
        }
    }

    private void createLargeMessage(long id, boolean liveToBackupSync) {
        ReplicatedLargeMessage msg = liveToBackupSync ? new LargeServerMessageInSync(this.storageManager) : this.storageManager.createLargeMessage();
        msg.setDurable(true);
        msg.setMessageID(id);
        this.largeMessages.put(id, msg);
    }

    private void handleCommitRollback(ReplicationCommitMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        if (packet.isRollback()) {
            journalToUse.appendRollbackRecord(packet.getTxId(), false);
        } else {
            journalToUse.appendCommitRecord(packet.getTxId(), false);
        }
    }

    private void handlePrepare(ReplicationPrepareMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
    }

    private void handleAppendDeleteTX(ReplicationDeleteTXMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordData());
    }

    private void handleAppendDelete(ReplicationDeleteMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        journalToUse.tryAppendDeleteRecord(packet.getId(), null, false);
    }

    private void handleAppendAddTXRecord(ReplicationAddTXMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        if (packet.getOperation() == ReplicationManager.ADD_OPERATION_TYPE.UPDATE) {
            journalToUse.appendUpdateRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordType(), packet.getRecordData());
        } else {
            journalToUse.appendAddRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordType(), packet.getRecordData());
        }
    }

    private void handleAppendAddRecord(ReplicationAddMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        switch (packet.getRecord()) {
            case UPDATE: {
                if (logger.isTraceEnabled()) {
                    logger.trace("Endpoint appendUpdate id = " + packet.getId());
                }
                journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), false);
                break;
            }
            case ADD: {
                if (logger.isTraceEnabled()) {
                    logger.trace("Endpoint append id = " + packet.getId());
                }
                journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), false);
                break;
            }
            case EVENT: {
                if (logger.isTraceEnabled()) {
                    logger.trace("Endpoint append id = " + packet.getId());
                }
                journalToUse.appendAddEvent(packet.getId(), packet.getJournalRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(packet.getRecordData()), false, null);
            }
        }
    }

    private void handlePageEvent(ReplicationPageEventMessage packet) throws Exception {
        ConcurrentMap<Integer, Page> pages = this.getPageMap(packet.getStoreName());
        Page page = (Page)pages.remove(packet.getPageNumber());
        if (page == null) {
            this.getPage(packet.getStoreName(), packet.getPageNumber());
            this.handlePageEvent(packet);
            return;
        }
        if (page != null) {
            if (packet.isDelete()) {
                if (this.deletePages) {
                    page.delete(null);
                }
            } else {
                page.close(false);
            }
        }
    }

    private void handlePageWrite(ReplicationPageWriteMessage packet) throws Exception {
        PagedMessage pgdMessage = packet.getPagedMessage();
        pgdMessage.initMessage(this.storageManager);
        Message msg = pgdMessage.getMessage();
        Page page = this.getPage(msg.getAddressSimpleString(), packet.getPageNumber());
        page.writeDirect(pgdMessage);
    }

    private ConcurrentMap<Integer, Page> getPageMap(SimpleString storeName) {
        ConcurrentMap mapResult;
        ConcurrentMap resultIndex = (ConcurrentHashMap)this.pageIndex.get(storeName);
        if (resultIndex == null && (mapResult = (ConcurrentMap)this.pageIndex.putIfAbsent(storeName, resultIndex = new ConcurrentHashMap())) != null) {
            resultIndex = mapResult;
        }
        return resultIndex;
    }

    private Page getPage(SimpleString storeName, int pageId) throws Exception {
        ConcurrentMap<Integer, Page> map = this.getPageMap(storeName);
        Page page = (Page)map.get(pageId);
        if (page == null) {
            page = this.newPage(pageId, storeName, map);
        }
        return page;
    }

    private synchronized Page newPage(int pageId, SimpleString storeName, ConcurrentMap<Integer, Page> map) throws Exception {
        Page page = (Page)map.get(pageId);
        if (page == null) {
            page = this.pageManager.getPageStore(storeName).createPage(pageId);
            page.open();
            map.put(pageId, page);
        }
        return page;
    }

    private Journal getJournal(byte journalID) {
        return this.journals[journalID];
    }

    public void setExecutor(Executor executor2) {
        this.executor = executor2;
    }

    public ConcurrentMap<Long, ReplicatedLargeMessage> getLargeMessages() {
        return this.largeMessages;
    }

    public static final class JournalSyncFile {
        private FileChannel channel;
        private final File file;
        private FileOutputStream fos;

        public JournalSyncFile(JournalFile jFile) throws Exception {
            SequentialFile seqFile = jFile.getFile();
            this.file = seqFile.getJavaFile();
            seqFile.close();
        }

        synchronized FileChannel getChannel() throws Exception {
            if (this.channel == null) {
                this.fos = new FileOutputStream(this.file);
                this.channel = this.fos.getChannel();
            }
            return this.channel;
        }

        synchronized void close() throws IOException {
            if (this.fos != null) {
                this.fos.close();
            }
            if (this.channel != null) {
                this.channel.close();
            }
        }

        public String toString() {
            return "JournalSyncFile(file=" + this.file.getAbsolutePath() + ")";
        }
    }

    public static interface ReplicationEndpointEventListener {
        public void onRemoteBackupUpToDate(String var1, long var2);

        public void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping var1) throws ActiveMQException;

        public void onLiveNodeId(String var1);
    }
}

