package uk.co.real_logic.artio.engine.framer;

import io.aeron.ExclusivePublication;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.ControlledFragmentHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import uk.co.real_logic.artio.engine.MessageTimingHandler;
import uk.co.real_logic.artio.engine.SenderSequenceNumber;
import uk.co.real_logic.artio.engine.logger.ArchiveDescriptor;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.protocol.GatewayPublication;

/* loaded from: input_file:uk/co/real_logic/artio/engine/framer/FixSenderEndPointTest.class */
public class FixSenderEndPointTest {
    private static final long CONNECTION_ID = 1;
    private static final int LIBRARY_ID = 2;
    private static final int HEADER_LENGTH = 8;
    private static final long POSITION = 8192;
    private static final int BODY_LENGTH = 84;
    private static final int LENGTH = GatewayPublication.FRAME_SIZE + BODY_LENGTH;
    private static final int FRAGMENT_LENGTH = ArchiveDescriptor.alignTerm((8 + GatewayPublication.FRAME_SIZE) + BODY_LENGTH);
    private static final long BEGIN_POSITION = 8000;
    private static final int MAX_BYTES_IN_BUFFER = 252;
    public static final int INBOUND_BUFFER_LEN = 128;
    private final TcpChannel tcpChannel = (TcpChannel) Mockito.mock(TcpChannel.class);
    private final AtomicCounter bytesInBuffer = fakeCounter();
    private final AtomicCounter invalidLibraryAttempts = (AtomicCounter) Mockito.mock(AtomicCounter.class);
    private final ErrorHandler errorHandler = (ErrorHandler) Mockito.mock(ErrorHandler.class);
    private final Framer framer = (Framer) Mockito.mock(Framer.class);
    private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    private final UnsafeBuffer buffer = new UnsafeBuffer(this.byteBuffer);
    private final BlockablePosition libraryBlockablePosition = (BlockablePosition) Mockito.mock(BlockablePosition.class);
    private final BlockablePosition replayBlockablePosition = (BlockablePosition) Mockito.mock(BlockablePosition.class);
    private final SenderSequenceNumber senderSequenceNumber = (SenderSequenceNumber) Mockito.mock(SenderSequenceNumber.class);
    private final MessageTimingHandler messageTimingHandler = (MessageTimingHandler) Mockito.mock(MessageTimingHandler.class);
    private final ExclusivePublication inboundPublication = (ExclusivePublication) Mockito.mock(ExclusivePublication.class);
    private final UnsafeBuffer inboundBuffer = new UnsafeBuffer(new byte[INBOUND_BUFFER_LEN]);
    private final FixSenderEndPoint endPoint = new FixSenderEndPoint(CONNECTION_ID, LIBRARY_ID, this.libraryBlockablePosition, this.inboundPublication, this.replayBlockablePosition, this.tcpChannel, this.bytesInBuffer, this.invalidLibraryAttempts, this.errorHandler, this.framer, MAX_BYTES_IN_BUFFER, 10000, 0, this.senderSequenceNumber, this.messageTimingHandler);

    @Before
    public void setup() {
        Mockito.when(Long.valueOf(this.inboundPublication.tryClaim(Mockito.anyInt(), (BufferClaim) Mockito.any()))).then(invocationOnMock -> {
            ((BufferClaim) invocationOnMock.getArgument(1)).wrap(this.inboundBuffer, 0, ((Integer) invocationOnMock.getArgument(0)).intValue() + 32);
            return Long.valueOf(CONNECTION_ID);
        });
    }

    @Test
    public void shouldRetrySlowConsumerMessage() {
        becomeSlowConsumer();
        channelWillWrite(BODY_LENGTH);
        onSlowOutboundMessage();
        byteBufferWritten();
        assertBytesInBuffer(0);
        verifyDoesNotBlockLibrary();
    }

    @Test
    public void shouldBeAbleToFragmentSlowConsumerRetries() {
        becomeSlowConsumer();
        channelWillWrite(41);
        onSlowOutboundMessage();
        verifyBlocksLibraryAt(BEGIN_POSITION);
        byteBufferWritten();
        assertBytesInBuffer(43);
        channelWillWrite(43);
        onSlowOutboundMessage();
        verifyDoesNotBlockLibrary();
        byteBufferWritten();
        assertBytesInBuffer(0);
        verifyNoMoreErrors();
    }

    @Test
    public void shouldDisconnectSlowConsumerAfterTimeout() {
        channelWillWrite(BODY_LENGTH);
        onOutboundMessage(100L, POSITION);
        long j = 100 + 100;
        long j2 = POSITION + FRAGMENT_LENGTH;
        channelWillWrite(0);
        onOutboundMessage(j, j2);
        this.endPoint.checkTimeouts(j + 10001);
        verifySlowConsumerDisconnect(Mockito.times(1));
        errorLogged();
    }

    @Test
    public void shouldNotDisconnectSlowConsumerBeforeTimeout() {
        channelWillWrite(BODY_LENGTH);
        onOutboundMessage(100L, POSITION);
        this.endPoint.checkTimeouts(100 + 9999);
        verifySlowConsumerDisconnect(Mockito.never());
        verifyNoMoreErrors();
    }

    @Test
    public void shouldNotDisconnectSlowConsumerBeforeTimeoutOnSlowChannel() {
        channelWillWrite(0);
        onOutboundMessage(100L, POSITION);
        long j = 100 + 9999;
        channelWillWrite(41);
        onSlowOutboundMessage(j);
        verifyBlocksLibraryAt(BEGIN_POSITION);
        this.endPoint.checkTimeouts(j + 9999);
        verifySlowConsumerDisconnect(Mockito.never());
        verifyNoMoreErrors();
    }

    @Test
    public void shouldNotDisconnectAtStartDueToTimeout() {
        this.endPoint.checkTimeouts(9999L);
        verifySlowConsumerDisconnect(Mockito.never());
        verifyNoMoreErrors();
        verifyDoesNotBlockLibrary();
    }

    @Test
    public void shouldNotDisconnectRegularConsumerDueToTimeout() {
        channelWillWrite(BODY_LENGTH);
        onOutboundMessage(100L, POSITION);
        this.endPoint.checkTimeouts(100 + 10001);
        verifySlowConsumerDisconnect(Mockito.never());
        verifyNoMoreErrors();
    }

    @Test
    public void shouldDisconnectSlowConsumerAfterTimeoutAfterFragment() {
        becomeSlowConsumer();
        channelWillWrite(41);
        onSlowOutboundMessage();
        verifyBlocksLibraryAt(BEGIN_POSITION);
        channelWillWrite(0);
        onSlowOutboundMessage();
        verifyBlocksLibraryAt(BEGIN_POSITION);
        this.endPoint.checkTimeouts(10101L);
        verifySlowConsumerDisconnect(Mockito.times(1));
        errorLogged();
    }

    @Test
    public void shouldBecomeReplaySlowConsumer() {
        channelWillWrite(0);
        onReplayMessage(0L, 0L);
        assertBytesInBuffer(BODY_LENGTH);
    }

    @Test
    public void shouldNotSendFurtherMessagesBeforeReplayRetry() {
        channelWillWrite(0);
        onReplayMessage(0L, 0L);
        byteBufferWritten();
        onReplayMessage(0L, 0 + 84);
        byteBufferNotWritten();
        assertBytesInBuffer(168);
    }

    @Test
    public void shouldRetryReplaySlowConsumer() {
        channelWillWrite(0);
        onReplayMessage(0L, 0L);
        byteBufferWritten();
        onNormalStreamReplayComplete();
        assertReplayPaused();
        channelWillWrite(BODY_LENGTH);
        onSlowReplayMessage(0L, 0L);
        byteBufferWritten();
        verifyDoesNotBlockLibrary();
        onSlowStreamReplayComplete();
        assertNotReplayPaused();
        assertBytesInBuffer(0);
    }

    @Test
    public void shouldDisconnectReplaySlowConsumer() {
        channelWillWrite(0);
        onReplayMessage(0L, 0L);
        long j = 0 + FRAGMENT_LENGTH;
        onReplayMessage(0L, j);
        long j2 = j + FRAGMENT_LENGTH;
        onReplayMessage(0L, j2);
        onReplayMessage(0L, j2 + FRAGMENT_LENGTH);
        verifySlowConsumerDisconnect(Mockito.times(1));
    }

    @Test
    public void shouldBeAbleToFragmentReplaySlowConsumerRetries() {
        channelWillWrite(0);
        onReplayMessage(0L, POSITION);
        byteBufferWritten();
        verifyDoesNotBlockLibrary();
        onNormalStreamReplayComplete();
        assertReplayPaused();
        channelWillWrite(41);
        onSlowReplayMessage(0L, POSITION);
        byteBufferWritten();
        assertBytesInBuffer(43);
        verifyBlocksReplayAt(BEGIN_POSITION);
        onSlowStreamReplayComplete();
        assertReplayPaused();
        channelWillWrite(43);
        onSlowReplayMessage(0L, POSITION);
        byteBufferWritten();
        assertBytesInBuffer(0);
        verifyNoMoreErrors();
        verifyDoesNotBlockLibrary();
        onSlowStreamReplayComplete();
        assertNotReplayPaused();
    }

    @Test
    public void shouldNotSendSlowConsumerMessageUntilReplayComplete() {
        channelWillWrite(0);
        onReplayMessage(0L, POSITION);
        byteBufferWritten();
        onNormalStreamReplayComplete();
        assertReplayPaused();
        channelWillWrite(41);
        onSlowReplayMessage(0L, POSITION);
        byteBufferWritten();
        assertBytesInBuffer(43);
        onSlowStreamReplayComplete();
        assertReplayPaused();
        onOutboundMessage(0L, POSITION);
        byteBufferNotWritten();
        assertBytesInBuffer(127);
        onSlowOutboundMessage();
        byteBufferNotWritten();
        assertBytesInBuffer(127);
        verifyBlocksLibraryAt(BEGIN_POSITION);
        channelWillWrite(43);
        onSlowReplayMessage(0L, POSITION);
        byteBufferWritten();
        assertBytesInBuffer(BODY_LENGTH);
        onSlowStreamReplayComplete();
        assertNotReplayPaused();
        channelWillWrite(BODY_LENGTH);
        onSlowOutboundMessage();
        byteBufferWritten();
        assertBytesInBuffer(0);
        verifyNoMoreErrors();
    }

    @Test
    public void shouldNotSendReplayMessageUntilSlowConsumerComplete() {
        long j = FRAGMENT_LENGTH;
        becomeSlowConsumer();
        channelWillWrite(41);
        onSlowOutboundMessage();
        byteBufferWritten();
        assertBytesInBuffer(43);
        verifyBlocksLibraryAt(BEGIN_POSITION);
        channelWillWrite(0);
        onReplayMessage(0L, j);
        byteBufferNotWritten();
        assertBytesInBuffer(127);
        onNormalStreamReplayComplete();
        assertNotReplayPaused();
        onSlowReplayMessage(0L, j);
        byteBufferNotWritten();
        assertBytesInBuffer(127);
        onSlowStreamReplayComplete();
        assertNotReplayPaused();
        channelWillWrite(43);
        onSlowOutboundMessage();
        byteBufferWritten();
        assertBytesInBuffer(BODY_LENGTH);
        channelWillWrite(BODY_LENGTH);
        onSlowReplayMessage(0L, 84L);
        byteBufferWritten();
        assertBytesInBuffer(0);
        onSlowStreamReplayComplete();
        assertNotReplayPaused();
        verifyNoMoreErrors();
    }

    private void byteBufferNotWritten() {
        byteBufferWritten(Mockito.never());
    }

    private void errorLogged() {
        ((ErrorHandler) Mockito.verify(this.errorHandler)).onError((Throwable) Mockito.any(IllegalStateException.class));
    }

    private void verifyNoMoreErrors() {
        Mockito.verifyNoMoreInteractions(new Object[]{this.errorHandler});
    }

    private void onOutboundMessage(long j, long j2) {
        this.endPoint.onOutboundMessage(LIBRARY_ID, this.buffer, 0, BODY_LENGTH, 0, j2, j, 0);
    }

    private void onReplayMessage(long j, long j2) {
        this.endPoint.onReplayMessage(this.buffer, 0, BODY_LENGTH, j, j2);
    }

    private void onSlowReplayMessage(long j, long j2) {
        this.endPoint.onSlowReplayMessage(this.buffer, 0, BODY_LENGTH, j, j2, 0);
    }

    private void verifySlowConsumerDisconnect(VerificationMode verificationMode) {
        ((Framer) Mockito.verify(this.framer, verificationMode)).onDisconnect(LIBRARY_ID, CONNECTION_ID, DisconnectReason.SLOW_CONSUMER);
    }

    private void byteBufferWritten() {
        byteBufferWritten(Mockito.times(1));
    }

    private void byteBufferWritten(VerificationMode verificationMode) {
        try {
            ((TcpChannel) Mockito.verify(this.tcpChannel, verificationMode)).write(this.byteBuffer);
            Mockito.reset(new TcpChannel[]{this.tcpChannel});
        } catch (IOException e) {
            LangUtil.rethrowUnchecked(e);
        }
    }

    private void assertBytesInBuffer(int i) {
        Assert.assertEquals(i, this.bytesInBuffer.get());
    }

    private void onSlowOutboundMessage() {
        onSlowOutboundMessage(100L);
    }

    private void onSlowOutboundMessage(long j) {
        Assert.assertEquals(ControlledFragmentHandler.Action.CONTINUE, this.endPoint.onSlowOutboundMessage(this.buffer, 8, LENGTH, POSITION, BODY_LENGTH, LIBRARY_ID, j, 0, 1));
    }

    private void becomeSlowConsumer() {
        channelWillWrite(0);
        onOutboundMessage(0L, POSITION);
        byteBufferWritten();
    }

    private void channelWillWrite(int i) {
        try {
            Mockito.when(Integer.valueOf(this.tcpChannel.write(this.byteBuffer))).thenReturn(Integer.valueOf(i));
        } catch (IOException e) {
            LangUtil.rethrowUnchecked(e);
        }
    }

    private AtomicCounter fakeCounter() {
        AtomicLong atomicLong = new AtomicLong();
        AtomicCounter atomicCounter = (AtomicCounter) Mockito.mock(AtomicCounter.class);
        Answer answer = invocationOnMock -> {
            return Long.valueOf(atomicLong.get());
        };
        Answer answer2 = invocationOnMock2 -> {
            atomicLong.set(((Long) invocationOnMock2.getArgument(0)).longValue());
            return null;
        };
        Answer answer3 = invocationOnMock3 -> {
            return Long.valueOf(atomicLong.getAndAdd(((Long) invocationOnMock3.getArgument(0)).longValue()));
        };
        Mockito.when(Long.valueOf(atomicCounter.get())).then(answer);
        Mockito.when(Long.valueOf(atomicCounter.getWeak())).then(answer);
        ((AtomicCounter) Mockito.doAnswer(answer2).when(atomicCounter)).set(Mockito.anyLong());
        ((AtomicCounter) Mockito.doAnswer(answer2).when(atomicCounter)).setOrdered(Mockito.anyLong());
        ((AtomicCounter) Mockito.doAnswer(answer2).when(atomicCounter)).setWeak(Mockito.anyLong());
        Mockito.when(Long.valueOf(atomicCounter.getAndAdd(Mockito.anyLong()))).then(answer3);
        Mockito.when(Long.valueOf(atomicCounter.getAndAddOrdered(Mockito.anyLong()))).then(answer3);
        return atomicCounter;
    }

    private void verifyDoesNotBlockLibrary() {
        ((BlockablePosition) Mockito.verify(this.libraryBlockablePosition, Mockito.never())).blockPosition(Mockito.anyLong());
        ((BlockablePosition) Mockito.verify(this.replayBlockablePosition, Mockito.never())).blockPosition(Mockito.anyLong());
    }

    private void verifyBlocksLibraryAt(long j) {
        verifyBlocksAt(j, this.libraryBlockablePosition);
    }

    private void verifyBlocksReplayAt(long j) {
        verifyBlocksAt(j, this.replayBlockablePosition);
    }

    private void verifyBlocksAt(long j, BlockablePosition blockablePosition) {
        ((BlockablePosition) Mockito.verify(blockablePosition)).blockPosition(j);
        Mockito.reset(new BlockablePosition[]{blockablePosition});
    }

    private void assertNotReplayPaused() {
        Assert.assertFalse("should not be replay paused", this.endPoint.replayPaused());
    }

    private void assertReplayPaused() {
        Assert.assertTrue("should be replay paused", this.endPoint.replayPaused());
    }

    private void onSlowStreamReplayComplete() {
        this.endPoint.onReplayComplete();
    }

    private void onNormalStreamReplayComplete() {
        this.endPoint.onReplayComplete();
    }
}
