package uk.co.real_logic.artio.engine.logger;

import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.driver.MediaDriver;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import org.agrona.collections.LongArrayList;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochNanoClock;
import org.agrona.concurrent.OffsetEpochNanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import uk.co.real_logic.artio.CommonConfiguration;
import uk.co.real_logic.artio.TestFixtures;
import uk.co.real_logic.artio.Timing;
import uk.co.real_logic.artio.dictionary.generation.Exceptions;
import uk.co.real_logic.artio.engine.logger.FixMessageLogger;
import uk.co.real_logic.artio.ilink.ILinkMessageConsumer;
import uk.co.real_logic.artio.messages.MessageHeaderEncoder;
import uk.co.real_logic.artio.messages.ReplayerTimestampEncoder;
import uk.co.real_logic.artio.protocol.GatewayPublication;

/* loaded from: input_file:uk/co/real_logic/artio/engine/logger/AbstractFixMessageLoggerTest.class */
public abstract class AbstractFixMessageLoggerTest {
    static final int LIBRARY_ID = 1;
    static final long SESSION_ID = 2;
    static final int SEQUENCE_INDEX = 3;
    static final long CONNECTION_ID = 4;
    int compactionSize;
    final EpochNanoClock clock = new OffsetEpochNanoClock();
    final LongArrayList timestamps = new LongArrayList();
    private final FixMessageConsumer fixConsumer = (fixMessageDecoder, directBuffer, i, i2, header) -> {
        long timestamp = fixMessageDecoder.timestamp();
        this.timestamps.add(Long.valueOf(timestamp));
        Assert.assertEquals(timestamp, Long.parseLong(fixMessageDecoder.body().trim()));
    };
    private String libraryChannel;
    private MediaDriver mediaDriver;
    private Aeron aeron;
    private FixMessageLogger logger;
    private GatewayPublication inboundPublication;
    private GatewayPublication outboundPublication;
    private ExclusivePublication replayPublication;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setup(ILinkMessageConsumer iLinkMessageConsumer) {
        this.libraryChannel = "aeron:udp?endpoint=localhost:" + TestFixtures.unusedPort();
        this.mediaDriver = TestFixtures.launchJustMediaDriver();
        this.aeron = Aeron.connect();
        this.logger = new FixMessageLogger(new FixMessageLogger.Configuration().fixMessageConsumer(this.fixConsumer).iLinkMessageConsumer(iLinkMessageConsumer).compactionSize(this.compactionSize).libraryAeronChannel(this.libraryChannel));
        this.inboundPublication = newPublication(1);
        this.outboundPublication = newPublication(2);
        this.replayPublication = this.aeron.addExclusivePublication("aeron:ipc", SEQUENCE_INDEX);
    }

    @After
    public void teardown() {
        Exceptions.closeAll(new AutoCloseable[]{this.aeron, this.mediaDriver});
        Exceptions.closeAll(new Agent[]{this.logger});
    }

    @Test
    public void shouldReOrderMessagesByTimestamp() {
        onMessage(this.inboundPublication, 2L);
        onMessage(this.inboundPublication, 3L);
        onMessage(this.inboundPublication, CONNECTION_ID);
        onMessage(this.outboundPublication, 1L);
        onMessage(this.outboundPublication, 5L);
        onMessage(this.outboundPublication, 7L);
        onMessage(this.inboundPublication, 6L);
        onReplayerTimestamp(this.replayPublication, 10L);
        assertEventuallyReceives(6);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{1L, 2L, 3L, Long.valueOf(CONNECTION_ID), 5L, 6L}));
        this.timestamps.clear();
        onMessage(this.inboundPublication, 8L);
        assertEventuallyReceives(1);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{7L}));
        this.timestamps.clear();
        MatcherAssert.assertThat("failed to reshuffle", Integer.valueOf(this.logger.bufferPosition()), Matchers.lessThanOrEqualTo(Integer.valueOf(this.compactionSize)));
        onMessage(this.inboundPublication, 9L);
        onMessage(this.outboundPublication, 10L);
        assertEventuallyReceives(2);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{8L, 9L}));
        this.timestamps.clear();
        onMessage(this.outboundPublication, 11L);
        assertEventuallyReads(1);
        assertNoTimestamps();
        this.logger.onClose();
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{10L, 11L}));
        Assert.assertEquals("failed to reshuffle", 0L, this.logger.bufferPosition());
    }

    @Test
    public void shouldReOrderMessagesByTimestampIntermediatePolling() {
        onMessage(this.inboundPublication, 1L);
        onMessage(this.inboundPublication, 3L);
        assertEventuallyReads(2);
        assertNoTimestamps();
        this.logger.doWork();
        onMessage(this.inboundPublication, 5L);
        assertEventuallyReads(1);
        assertNoTimestamps();
        onMessage(this.outboundPublication, 2L);
        onMessage(this.outboundPublication, CONNECTION_ID);
        onMessage(this.outboundPublication, 6L);
        onReplayerTimestamp(this.replayPublication, 10L);
        assertEventuallyReads(4);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{1L, 2L, 3L, Long.valueOf(CONNECTION_ID), 5L}));
        this.timestamps.clear();
        this.logger.onClose();
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{6L}));
        this.timestamps.clear();
        Assert.assertEquals("failed to reshuffle", 0L, this.logger.bufferPosition());
    }

    @Test
    public void shouldHandleSubsequentReplayTimestampsCorrectly() {
        onMessage(this.outboundPublication, 1603800570460284857L);
        onMessage(this.inboundPublication, 1603800570513023097L);
        onReplayerTimestamp(this.replayPublication, 1603800571498664415L);
        assertEventuallyReads(SEQUENCE_INDEX);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{1603800570460284857L}));
        this.timestamps.clear();
        onMessage(this.outboundPublication, 1603800578520566892L);
        assertEventuallyReads(1);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{1603800570513023097L}));
        this.timestamps.clear();
        onMessage(this.inboundPublication, 1603800581079423921L);
        assertEventuallyReads(1);
        assertNoTimestamps();
        onMessage(this.outboundPublication, 1603800586520278849L);
        assertEventuallyReads(1);
        assertNoTimestamps();
        onMessage(this.inboundPublication, 1603800591079715485L);
        assertEventuallyReads(1);
        assertNoTimestamps();
        onReplayerTimestamp(this.replayPublication, 1603800591079715486L);
        assertEventuallyReads(1);
        MatcherAssert.assertThat(this.timestamps, Matchers.contains(new Long[]{1603800578520566892L, 1603800581079423921L, 1603800586520278849L}));
    }

    private void assertEventuallyReceives(int i) {
        Timing.assertEventuallyTrue(() -> {
            return "Failed to receive a message: " + this.timestamps;
        }, () -> {
            this.logger.doWork();
            return this.timestamps.size() == i;
        }, 1000L, () -> {
        });
    }

    private void assertEventuallyReads(final int i) {
        Timing.assertEventuallyTrue(() -> {
            return "Failed to receive a message: " + this.timestamps;
        }, new BooleanSupplier() { // from class: uk.co.real_logic.artio.engine.logger.AbstractFixMessageLoggerTest.1
            int read = 0;

            @Override // java.util.function.BooleanSupplier
            public boolean getAsBoolean() {
                this.read += AbstractFixMessageLoggerTest.this.logger.doWork();
                return this.read >= i;
            }
        }, 1000L, () -> {
        });
    }

    abstract void onMessage(GatewayPublication gatewayPublication, long j);

    private void onReplayerTimestamp(ExclusivePublication exclusivePublication, long j) {
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[16]);
        new ReplayerTimestampEncoder().wrapAndApplyHeader(unsafeBuffer, 0, new MessageHeaderEncoder()).timestamp(j);
        untilComplete(() -> {
            return exclusivePublication.offer(unsafeBuffer);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void untilComplete(LongSupplier longSupplier) {
        while (longSupplier.getAsLong() <= 0) {
            Thread.yield();
        }
    }

    private GatewayPublication newPublication(int i) {
        return new GatewayPublication(this.aeron.addExclusivePublication(this.libraryChannel, i), (AtomicCounter) Mockito.mock(AtomicCounter.class), CommonConfiguration.backoffIdleStrategy(), this.clock, 1);
    }

    private void assertNoTimestamps() {
        MatcherAssert.assertThat(this.timestamps, Matchers.hasSize(0));
    }
}
