/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.logger;

import io.aeron.ExclusivePublication;
import io.aeron.Subscription;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.CollectionUtil;
import org.agrona.collections.IntHashSet;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochNanoClock;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.status.AtomicCounter;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.FixGatewayException;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.FixPRetransmitHandler;
import uk.co.real_logic.artio.engine.HeaderSetup;
import uk.co.real_logic.artio.engine.ReplayHandler;
import uk.co.real_logic.artio.engine.ReplayerCommandQueue;
import uk.co.real_logic.artio.engine.SenderSequenceNumbers;
import uk.co.real_logic.artio.engine.framer.FixThrottleRejectBuilder;
import uk.co.real_logic.artio.engine.logger.EnqueuedReplay;
import uk.co.real_logic.artio.engine.logger.FixPReplayerSession;
import uk.co.real_logic.artio.engine.logger.FixReplayerCodecs;
import uk.co.real_logic.artio.engine.logger.FixReplayerSession;
import uk.co.real_logic.artio.engine.logger.FixSessionCodecsFactory;
import uk.co.real_logic.artio.engine.logger.GapFillEncoder;
import uk.co.real_logic.artio.engine.logger.ReplayChannel;
import uk.co.real_logic.artio.engine.logger.ReplayQuery;
import uk.co.real_logic.artio.engine.logger.ReplayTimestamper;
import uk.co.real_logic.artio.engine.logger.ReplayerSession;
import uk.co.real_logic.artio.fields.EpochFractionFormat;
import uk.co.real_logic.artio.fields.UtcTimestampEncoder;
import uk.co.real_logic.artio.fixp.AbstractFixPOffsets;
import uk.co.real_logic.artio.fixp.AbstractFixPParser;
import uk.co.real_logic.artio.fixp.AbstractFixPProxy;
import uk.co.real_logic.artio.fixp.FixPMessageDissector;
import uk.co.real_logic.artio.fixp.FixPProtocol;
import uk.co.real_logic.artio.fixp.FixPProtocolFactory;
import uk.co.real_logic.artio.messages.DisconnectDecoder;
import uk.co.real_logic.artio.messages.FixMessageDecoder;
import uk.co.real_logic.artio.messages.FixPMessageEncoder;
import uk.co.real_logic.artio.messages.FixPProtocolType;
import uk.co.real_logic.artio.messages.ILinkConnectDecoder;
import uk.co.real_logic.artio.messages.InboundFixPConnectDecoder;
import uk.co.real_logic.artio.messages.ManageFixPConnectionDecoder;
import uk.co.real_logic.artio.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.messages.MessageHeaderEncoder;
import uk.co.real_logic.artio.messages.ReplayCompleteEncoder;
import uk.co.real_logic.artio.messages.RequestDisconnectDecoder;
import uk.co.real_logic.artio.messages.ValidResendRequestDecoder;
import uk.co.real_logic.artio.util.AsciiBuffer;
import uk.co.real_logic.artio.util.CharFormatter;
import uk.co.real_logic.artio.util.Lazy;
import uk.co.real_logic.artio.util.MessageTypeEncoding;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;

public class Replayer
implements Agent,
ControlledFragmentHandler {
    public static final int MOST_RECENT_MESSAGE = 0;
    static final int MESSAGE_FRAME_BLOCK_LENGTH = 65 + FixMessageDecoder.bodyHeaderLength();
    static final int SIZE_OF_LENGTH_FIELD = FixMessageDecoder.bodyHeaderLength();
    private static final int POLL_LIMIT = 10;
    private final AsciiBuffer asciiBuffer = new MutableAsciiBuffer();
    private final BufferClaim bufferClaim;
    final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
    final ReplayCompleteEncoder replayCompleteEncoder = new ReplayCompleteEncoder();
    private final LongHashSet gapFillMessageTypes;
    private final FixSessionCodecsFactory fixSessionCodecsFactory;
    private final CharFormatter receivedResendFormatter = new CharFormatter("Received Resend Request for inclusive range: [%s, %s]");
    private final CharFormatter alreadyDisconnectedFormatter = new CharFormatter("Not processing Resend Request for %s because it has already disconnected");
    final CharFormatter completeNotRecentFormatter = new CharFormatter("ReplayerSession: completeReplay-!upToMostRecent replayedMessages=%s endSeqNo=%s beginSeqNo=%s expectedCount=%s");
    final CharFormatter completeReplayGapfillFormatter = new CharFormatter("ReplayerSession: completeReplay-sendGapFill action=%s, replayedMessages=%s, beginGapFillSeqNum=%s, newSequenceNumber=%s");
    private final IntHashSet gapfillOnRetransmitILinkTemplateIds;
    private final Lazy<FixPProtocol> binaryFixPProtocol;
    private final Lazy<FixPMessageDissector> binaryFixPDissector;
    private final Lazy<AbstractFixPParser> binaryFixPParser;
    private final Lazy<AbstractFixPProxy> binaryFixPProxy;
    private final Lazy<AbstractFixPOffsets> abstractBinaryFixPOffsets;
    private final LongHashSet fixPConnectionIds = new LongHashSet();
    private final ILinkConnectDecoder iLinkConnect = new ILinkConnectDecoder();
    private final InboundFixPConnectDecoder inboundFixPConnect = new InboundFixPConnectDecoder();
    private final ManageFixPConnectionDecoder manageFixPConnection = new ManageFixPConnectionDecoder();
    private final FixPMessageEncoder fixPMessageEncoder = new FixPMessageEncoder();
    private final ReplayTimestamper timestamper;
    private final List<ReplayChannel> closingChannels = new ArrayList<ReplayChannel>();
    private final Long2ObjectHashMap<ReplayChannel> connectionIdToReplayerChannel = new Long2ObjectHashMap();
    private final MessageHeaderDecoder messageHeader = new MessageHeaderDecoder();
    private final ValidResendRequestDecoder validResendRequest = new ValidResendRequestDecoder();
    private final RequestDisconnectDecoder requestDisconnect = new RequestDisconnectDecoder();
    private final DisconnectDecoder disconnect = new DisconnectDecoder();
    private final int maxBytesInBuffer;
    private final ReplayerCommandQueue replayerCommandQueue;
    private final AtomicCounter currentReplayCount;
    private final int maxConcurrentSessionReplays;
    private final EpochNanoClock clock;
    private final EngineConfiguration configuration;
    private final ReplayQuery outboundReplayQuery;
    private final ExclusivePublication publication;
    private final IdleStrategy idleStrategy;
    private final ErrorHandler errorHandler;
    private final int maxClaimAttempts;
    private final Subscription inboundSubscription;
    private final String agentNamePrefix;
    private final ReplayHandler replayHandler;
    private final FixPRetransmitHandler fixPRetransmitHandler;
    private final SenderSequenceNumbers senderSequenceNumbers;
    private final UtcTimestampEncoder utcTimestampEncoder;

    public Replayer(ReplayQuery outboundReplayQuery, ExclusivePublication publication, BufferClaim bufferClaim, IdleStrategy idleStrategy, ErrorHandler errorHandler, int maxClaimAttempts, Subscription inboundSubscription, String agentNamePrefix, Set<String> gapfillOnReplayMessageTypes, IntHashSet gapfillOnRetransmitILinkTemplateIds, ReplayHandler replayHandler, FixPRetransmitHandler fixPRetransmitHandler, SenderSequenceNumbers senderSequenceNumbers, FixSessionCodecsFactory fixSessionCodecsFactory, int maxBytesInBuffer, ReplayerCommandQueue replayerCommandQueue, EpochFractionFormat epochFractionFormat, AtomicCounter currentReplayCount, int maxConcurrentSessionReplays, EpochNanoClock clock, FixPProtocolType fixPProtocolType, EngineConfiguration configuration) {
        this.outboundReplayQuery = outboundReplayQuery;
        this.publication = publication;
        this.bufferClaim = bufferClaim;
        this.idleStrategy = idleStrategy;
        this.errorHandler = errorHandler;
        this.maxClaimAttempts = maxClaimAttempts;
        this.inboundSubscription = inboundSubscription;
        this.agentNamePrefix = agentNamePrefix;
        this.gapfillOnRetransmitILinkTemplateIds = gapfillOnRetransmitILinkTemplateIds;
        this.replayHandler = replayHandler;
        this.fixPRetransmitHandler = fixPRetransmitHandler;
        this.senderSequenceNumbers = senderSequenceNumbers;
        this.fixSessionCodecsFactory = fixSessionCodecsFactory;
        this.maxBytesInBuffer = maxBytesInBuffer;
        this.replayerCommandQueue = replayerCommandQueue;
        this.currentReplayCount = currentReplayCount;
        this.maxConcurrentSessionReplays = maxConcurrentSessionReplays;
        this.clock = clock;
        this.configuration = configuration;
        this.gapFillMessageTypes = MessageTypeEncoding.packAllMessageTypes(gapfillOnReplayMessageTypes);
        this.utcTimestampEncoder = new UtcTimestampEncoder(epochFractionFormat);
        this.binaryFixPProtocol = new Lazy<FixPProtocol>(() -> FixPProtocolFactory.make(fixPProtocolType, errorHandler));
        this.binaryFixPDissector = new Lazy<FixPMessageDissector>(() -> new FixPMessageDissector(this.binaryFixPProtocol.get().messageDecoders()));
        this.binaryFixPParser = new Lazy<AbstractFixPParser>(() -> this.binaryFixPProtocol.get().makeParser(null));
        this.binaryFixPProxy = new Lazy<AbstractFixPProxy>(() -> this.binaryFixPProtocol.get().makeProxy(this.binaryFixPDissector.get(), publication, clock));
        this.abstractBinaryFixPOffsets = new Lazy<AbstractFixPOffsets>(() -> this.binaryFixPProtocol.get().makeOffsets());
        this.timestamper = new ReplayTimestamper(publication, clock);
    }

    @Override
    public ControlledFragmentHandler.Action onFragment(DirectBuffer buffer, int start, int length, Header header) {
        this.messageHeader.wrap(buffer, start);
        int templateId = this.messageHeader.templateId();
        int offset = start + 8;
        int blockLength = this.messageHeader.blockLength();
        int version = this.messageHeader.version();
        switch (templateId) {
            case 59: {
                this.validResendRequest.wrap(buffer, offset, blockLength, version);
                long sessionId = this.validResendRequest.session();
                long connectionId = this.validResendRequest.connection();
                long beginSeqNo = this.validResendRequest.beginSequenceNumber();
                long endSeqNo = this.validResendRequest.endSequenceNumber();
                int sequenceIndex = this.validResendRequest.sequenceIndex();
                this.validResendRequest.wrapBody(this.asciiBuffer);
                return this.onResendRequest(sessionId, connectionId, beginSeqNo, endSeqNo, sequenceIndex, this.asciiBuffer);
            }
            case 57: {
                this.iLinkConnect.wrap(buffer, offset, blockLength, version);
                this.fixPConnectionIds.add(this.iLinkConnect.connection());
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            case 67: {
                this.inboundFixPConnect.wrap(buffer, offset, blockLength, version);
                this.fixPConnectionIds.add(this.inboundFixPConnect.connection());
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            case 68: {
                this.manageFixPConnection.wrap(buffer, offset, blockLength, version);
                this.fixPConnectionIds.add(this.manageFixPConnection.connection());
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            case 12: {
                this.requestDisconnect.wrap(buffer, offset, blockLength, version);
                long connectionId = this.requestDisconnect.connection();
                this.onDisconnect(connectionId);
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            case 7: {
                this.disconnect.wrap(buffer, offset, blockLength, version);
                long connectionId = this.disconnect.connection();
                this.onDisconnect(connectionId);
                return ControlledFragmentHandler.Action.CONTINUE;
            }
        }
        return this.fixSessionCodecsFactory.onFragment(buffer, start, length, header);
    }

    private void onDisconnect(long connectionId) {
        this.fixPConnectionIds.remove(connectionId);
        ReplayChannel replayChannel = this.connectionIdToReplayerChannel.remove(connectionId);
        if (replayChannel != null) {
            this.currentReplayCount.decrement();
            if (!replayChannel.startClose()) {
                this.closingChannels.add(replayChannel);
            }
        }
    }

    ControlledFragmentHandler.Action onResendRequest(long sessionId, long connectionId, long beginSeqNo, long endSeqNo, int sequenceIndex, AsciiBuffer asciiBuffer) {
        if (this.senderSequenceNumbers.hasDisconnected(connectionId)) {
            DebugLogger.log(LogTag.REPLAY, this.alreadyDisconnectedFormatter, connectionId);
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        ReplayChannel replayChannel = this.connectionIdToReplayerChannel.get(connectionId);
        if (replayChannel != null) {
            int enqueuedReplayCount = replayChannel.enqueuedReplayCount();
            if (enqueuedReplayCount >= this.maxConcurrentSessionReplays) {
                this.errorHandler.onError(new FixGatewayException(String.format("Ignore resend request for sessionId=%d,connectionId=%d as %d requests in flight", sessionId, connectionId, enqueuedReplayCount)));
                return ControlledFragmentHandler.Action.CONTINUE;
            }
            int length = asciiBuffer.capacity();
            MutableAsciiBuffer copiedBuffer = new MutableAsciiBuffer(new byte[length]);
            copiedBuffer.putBytes(0, asciiBuffer, 0, length);
            replayChannel.enqueueReplay(new EnqueuedReplay(sessionId, connectionId, beginSeqNo, endSeqNo, sequenceIndex, copiedBuffer));
            return ControlledFragmentHandler.Action.COMMIT;
        }
        try {
            ReplayerSession session = this.processResendRequest(sessionId, connectionId, beginSeqNo, endSeqNo, sequenceIndex, asciiBuffer);
            if (session == null) {
                return ControlledFragmentHandler.Action.ABORT;
            }
            ReplayChannel channel = new ReplayChannel(session);
            this.connectionIdToReplayerChannel.put(connectionId, channel);
            this.currentReplayCount.increment();
            return ControlledFragmentHandler.Action.COMMIT;
        }
        catch (IllegalStateException e) {
            this.errorHandler.onError(e);
            return ControlledFragmentHandler.Action.CONTINUE;
        }
    }

    private ReplayerSession processResendRequest(long sessionId, long connectionId, long beginSeqNo, long endSeqNo, int sequenceIndex, AsciiBuffer asciiBuffer) {
        FixReplayerCodecs sessionCodecs = this.fixSessionCodecsFactory.get(sessionId);
        if (sessionCodecs != null) {
            return this.processFixResendRequest(sessionId, connectionId, (int)beginSeqNo, (int)endSeqNo, sequenceIndex, asciiBuffer, sessionCodecs);
        }
        if (this.fixPConnectionIds.contains(connectionId)) {
            DebugLogger.log(LogTag.REPLAY, this.receivedResendFormatter, beginSeqNo, endSeqNo);
            AtomicCounter bytesInBuffer = this.senderSequenceNumbers.bytesInBufferCounter(connectionId);
            if (bytesInBuffer == null) {
                return null;
            }
            FixPReplayerSession session = new FixPReplayerSession(connectionId, this.bufferClaim, this.idleStrategy, this.maxClaimAttempts, this.publication, this.outboundReplayQuery, (int)beginSeqNo, (int)endSeqNo, sessionId, this, this.gapfillOnRetransmitILinkTemplateIds, this.fixPMessageEncoder, this.binaryFixPParser.get(), this.binaryFixPProxy.get(), this.abstractBinaryFixPOffsets.get(), this.fixPRetransmitHandler, bytesInBuffer, this.configuration.senderMaxBytesInBuffer());
            session.query();
            return session;
        }
        throw new IllegalStateException("Unknown session: sessionId=" + sessionId + ",connectionId=" + connectionId);
    }

    private FixReplayerSession processFixResendRequest(long sessionId, long connectionId, int beginSeqNo, int endSeqNo, int sequenceIndex, AsciiBuffer asciiBuffer, FixReplayerCodecs sessionCodecs) {
        FixThrottleRejectBuilder throttleRejectBuilder;
        AtomicCounter bytesInBuffer = this.senderSequenceNumbers.bytesInBufferCounter(connectionId);
        if (bytesInBuffer == null) {
            return null;
        }
        DebugLogger.log(LogTag.REPLAY, this.receivedResendFormatter, (long)beginSeqNo, (long)endSeqNo);
        AbstractResendRequestDecoder resendRequest = sessionCodecs.resendRequest();
        resendRequest.reset();
        resendRequest.decode(asciiBuffer, 0, asciiBuffer.capacity());
        GapFillEncoder encoder = sessionCodecs.makeGapFillEncoder();
        encoder.setupMessage(resendRequest.header());
        if (this.configuration.throttleWindowInMs() == Integer.MIN_VALUE) {
            throttleRejectBuilder = null;
        } else {
            throttleRejectBuilder = new FixThrottleRejectBuilder(sessionCodecs.dictionary(), this.errorHandler, sessionId, connectionId, this.utcTimestampEncoder, this.clock, this.configuration.throttleWindowInMs(), this.configuration.throttleLimitOfMessages());
            HeaderSetup.setup(resendRequest.header(), throttleRejectBuilder.header());
        }
        String message = asciiBuffer.getAscii(0, asciiBuffer.capacity());
        FixReplayerSession fixReplayerSession = new FixReplayerSession(this.bufferClaim, this.idleStrategy, this.replayHandler, this.maxClaimAttempts, this.gapFillMessageTypes, this.publication, this.clock, beginSeqNo, endSeqNo, connectionId, sessionId, sequenceIndex, this.outboundReplayQuery, message, this.errorHandler, encoder, bytesInBuffer, this.maxBytesInBuffer, this.utcTimestampEncoder, this, throttleRejectBuilder);
        fixReplayerSession.query();
        return fixReplayerSession;
    }

    @Override
    public int doWork() {
        this.timestamper.sendTimestampMessage();
        int work = this.replayerCommandQueue.poll();
        return (work += this.pollReplayerChannels()) + this.inboundSubscription.controlledPoll(this, 10);
    }

    private int pollReplayerChannels() {
        Long2ObjectHashMap.EntryIterator replayerChannels = this.connectionIdToReplayerChannel.entrySet().iterator();
        int size = this.connectionIdToReplayerChannel.size();
        while (replayerChannels.hasNext()) {
            ReplayChannel channel = (ReplayChannel)replayerChannels.next().getValue();
            if (!channel.attemptReplay()) continue;
            EnqueuedReplay enqueuedReplay = channel.pollReplay();
            if (enqueuedReplay == null) {
                this.currentReplayCount.decrementOrdered();
                replayerChannels.remove();
                continue;
            }
            try {
                ReplayerSession session = this.processResendRequest(enqueuedReplay.sessionId(), enqueuedReplay.connectionId(), enqueuedReplay.beginSeqNo(), enqueuedReplay.endSeqNo(), enqueuedReplay.sequenceIndex(), enqueuedReplay.asciiBuffer());
                channel.startReplay(session);
            }
            catch (IllegalStateException e) {
                this.errorHandler.onError(e);
            }
        }
        return size + CollectionUtil.removeIf(this.closingChannels, ReplayChannel::attemptReplay);
    }

    @Override
    public void onClose() {
        this.connectionIdToReplayerChannel.values().forEach(ReplayChannel::closeNow);
        this.connectionIdToReplayerChannel.clear();
        this.currentReplayCount.set(0L);
        this.currentReplayCount.close();
        this.publication.close();
        this.outboundReplayQuery.close();
    }

    @Override
    public String roleName() {
        return this.agentNamePrefix + "Replayer";
    }
}

