/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.log.SequencedBatch;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.impl.log.SequencerMetrics;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.util.TestEntry;
import io.camunda.zeebe.scheduler.ActorCondition;
import io.camunda.zeebe.test.util.asserts.EitherAssert;
import io.camunda.zeebe.test.util.junit.AutoCloseResources;
import io.camunda.zeebe.util.Either;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.Mockito;

@Execution(value=ExecutionMode.CONCURRENT)
@AutoCloseResources
final class SequencerTest {
    @AutoCloseResources.AutoCloseResource
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();

    SequencerTest() {
    }

    @Test
    void notifiesConsumerOnWrite() {
        Sequencer sequencer = new Sequencer(0L, 16, new SequencerMetrics(this.meterRegistry));
        ActorCondition consumer = (ActorCondition)Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(consumer);
        sequencer.tryWrite(TestEntry.ofDefaults());
        ((ActorCondition)Mockito.verify((Object)consumer)).signal();
    }

    @Test
    void notifiesConsumerOnBatchWrite() {
        Sequencer sequencer = new Sequencer(0L, 16, new SequencerMetrics(this.meterRegistry));
        ActorCondition consumer = (ActorCondition)Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(consumer);
        sequencer.tryWrite(List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults()));
        ((ActorCondition)Mockito.verify((Object)consumer)).signal();
    }

    @Test
    void canReadAfterSingleWrite() {
        Sequencer sequencer = new Sequencer(1L, 16, new SequencerMetrics(this.meterRegistry));
        LogAppendEntry entry = TestEntry.ofDefaults();
        sequencer.tryWrite(entry);
        SequencedBatch read = sequencer.tryRead();
        Assertions.assertThat((List)read.entries()).containsExactly((Object[])new LogAppendEntry[]{entry});
    }

    @Test
    void canReadAfterBatchWrite() {
        Sequencer sequencer = new Sequencer(1L, 16, new SequencerMetrics(this.meterRegistry));
        List<LogAppendEntry> entries = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        sequencer.tryWrite(entries);
        SequencedBatch read = sequencer.tryRead();
        Assertions.assertThat((List)read.entries()).containsAnyElementsOf(entries);
    }

    @Test
    void cannotReadEmpty() {
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        SequencedBatch read = sequencer.tryRead();
        Assertions.assertThat((Object)read).isNull();
    }

    @Test
    void eventuallyRejectsWritesWithoutReader() {
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Awaitility.await((String)"sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> sequencer.tryWrite(TestEntry.ofDefaults()), result -> result.isLeft() && result.getLeft() == LogStreamWriter.WriteFailure.FULL);
    }

    @Test
    void eventuallyRejectsBatchWritesWithoutReader() {
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Awaitility.await((String)"sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> sequencer.tryWrite(List.of(TestEntry.ofKey(1L), TestEntry.ofKey(2L))), result -> result.isLeft() && result.getLeft() == LogStreamWriter.WriteFailure.FULL);
    }

    @Test
    void writingSingleEntryIncreasesPositions() {
        long initialPosition = 1L;
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Either result = sequencer.tryWrite(TestEntry.ofDefaults());
        EitherAssert.assertThat((Either)result).isRight().right().isEqualTo((Object)1L);
    }

    @Test
    void writingMultipleEntriesIncreasesPositions() {
        long initialPosition = 1L;
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        List<LogAppendEntry> entries = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        Either result = sequencer.tryWrite(entries);
        EitherAssert.assertThat((Either)result).isRight().right().isEqualTo((Object)(1L + (long)entries.size() - 1L));
    }

    @Test
    void notifiesReaderWhenRejectingWriteDueToFullQueue() {
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Awaitility.await((String)"sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> sequencer.tryWrite(TestEntry.ofDefaults()), result -> result.isLeft() && result.getLeft() == LogStreamWriter.WriteFailure.FULL);
        ActorCondition consumer = (ActorCondition)Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(consumer);
        Either result2 = sequencer.tryWrite(TestEntry.ofDefaults());
        EitherAssert.assertThat((Either)result2).isLeft();
        ((ActorCondition)Mockito.verify((Object)consumer)).signal();
    }

    @Test
    void notifiesReaderWhenRejectingBatchWriteDueToFullQueue() {
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Awaitility.await((String)"sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> sequencer.tryWrite(TestEntry.ofDefaults()), result -> result.isLeft() && result.getLeft() == LogStreamWriter.WriteFailure.FULL);
        ActorCondition consumer = (ActorCondition)Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(consumer);
        Either result2 = sequencer.tryWrite(List.of(TestEntry.ofKey(1L), TestEntry.ofKey(2L)));
        EitherAssert.assertThat((Either)result2).isLeft();
        ((ActorCondition)Mockito.verify((Object)consumer)).signal();
    }

    @Test
    void keepsPositionsWithSingleWriter() throws InterruptedException {
        long initialPosition = 1L;
        long entriesToWrite = 10000L;
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        List<LogAppendEntry> batch = List.of(TestEntry.ofKey(1L));
        Thread reader = this.newReaderThread(sequencer, 1L, 10000L);
        Thread writer = this.newWriterThread(sequencer, 1L, 10000L, batch, true);
        reader.start();
        writer.start();
        reader.join(10000L);
        writer.join(10000L);
    }

    @Test
    void keepsPositionsWithMultipleWriters() throws InterruptedException {
        int writers = 3;
        long initialPosition = 1L;
        long entriesToWrite = 10000L;
        long entriesToRead = 30000L;
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Thread reader = this.newReaderThread(sequencer, 1L, 30000L);
        List<LogAppendEntry> batch = List.of(TestEntry.ofKey(1L));
        List<Thread> writerThreads = IntStream.range(0, 3).mapToObj(i -> this.newWriterThread(sequencer, 1L, 10000L, batch, false)).toList();
        reader.start();
        writerThreads.forEach(Thread::start);
        reader.join(10000L);
        writerThreads.forEach(thread -> {
            try {
                thread.join(10000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    void keepsPositionsWithMultipleWritersWritingMultipleEntries() throws InterruptedException {
        int writers = 3;
        long initialPosition = 1L;
        long batchesToWrite = 10000L;
        long batchesToRead = 30000L;
        Sequencer sequencer = new Sequencer(1L, 0x1000000, new SequencerMetrics(this.meterRegistry));
        Thread reader = this.newReaderThread(sequencer, 1L, 30000L);
        List<LogAppendEntry> batch = List.of(TestEntry.ofKey(1L), TestEntry.ofKey(1L), TestEntry.ofKey(1L), TestEntry.ofKey(1L));
        List<Thread> writerThreads = IntStream.range(0, 3).mapToObj(i -> this.newWriterThread(sequencer, 1L, 10000L, batch, false)).toList();
        reader.start();
        writerThreads.forEach(Thread::start);
        reader.join(10000L);
        writerThreads.forEach(thread -> {
            try {
                thread.join(10000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private Thread newReaderThread(Sequencer sequencer, long initialPosition, long batchesToRead) {
        return new Thread(() -> {
            long batchesRead = 0L;
            long lastReadPosition = initialPosition - 1L;
            while (batchesRead < batchesToRead) {
                SequencedBatch result = sequencer.tryRead();
                if (result == null) continue;
                Assertions.assertThat((long)result.firstPosition()).isEqualTo(lastReadPosition + 1L);
                lastReadPosition = result.firstPosition() + (long)result.entries().size() - 1L;
                ++batchesRead;
            }
        });
    }

    private Thread newWriterThread(Sequencer sequencer, long initialPosition, long batchesToWrite, List<LogAppendEntry> batchToWrite, boolean isOnlyWriter) {
        return new Thread(() -> {
            long batchesWritten = 0L;
            long lastWrittenPosition = initialPosition - 1L;
            while (batchesWritten < batchesToWrite) {
                Either result = sequencer.tryWrite(batchToWrite);
                if (result.isRight()) {
                    if (isOnlyWriter) {
                        Assertions.assertThat((Long)((Long)result.get())).isEqualTo(lastWrittenPosition + (long)batchToWrite.size());
                    } else {
                        Assertions.assertThat((Long)((Long)result.get())).isGreaterThan(lastWrittenPosition);
                    }
                    lastWrittenPosition = (Long)result.get();
                    ++batchesWritten;
                    continue;
                }
                LockSupport.parkNanos(1000000L);
            }
        });
    }
}

