package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.util.TestEntry;
import io.camunda.zeebe.scheduler.ActorCondition;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.Mockito;

@Execution(ExecutionMode.CONCURRENT)
/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/SequencerTest.class */
final class SequencerTest {
    SequencerTest() {
    }

    @Test
    void notifiesConsumerOnWrite() {
        Sequencer sequencer = new Sequencer(1, 0L, 16);
        ActorCondition actorCondition = (ActorCondition) Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(actorCondition);
        sequencer.tryWrite(TestEntry.ofDefaults());
        ((ActorCondition) Mockito.verify(actorCondition)).signal();
    }

    @Test
    void notifiesConsumerOnBatchWrite() {
        Sequencer sequencer = new Sequencer(1, 0L, 16);
        ActorCondition actorCondition = (ActorCondition) Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(actorCondition);
        sequencer.tryWrite(List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults()));
        ((ActorCondition) Mockito.verify(actorCondition)).signal();
    }

    @Test
    void canReadAfterSingleWrite() {
        Sequencer sequencer = new Sequencer(1, 1L, 16);
        LogAppendEntry ofDefaults = TestEntry.ofDefaults();
        sequencer.tryWrite(ofDefaults);
        Assertions.assertThat(sequencer.tryRead().entries()).containsExactly(new LogAppendEntry[]{ofDefaults});
    }

    @Test
    void canReadAfterBatchWrite() {
        Sequencer sequencer = new Sequencer(1, 1L, 16);
        List of = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        sequencer.tryWrite(of);
        Assertions.assertThat(sequencer.tryRead().entries()).containsAnyElementsOf(of);
    }

    @Test
    void cannotReadEmpty() {
        Assertions.assertThat(new Sequencer(1, 1L, 16777216).tryRead()).isNull();
    }

    @Test
    void eventuallyRejectsWritesWithoutReader() {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        Awaitility.await("sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> {
            return Long.valueOf(sequencer.tryWrite(TestEntry.ofDefaults()));
        }, l -> {
            return l.longValue() <= 0;
        });
    }

    @Test
    void eventuallyRejectsBatchWritesWithoutReader() {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        Awaitility.await("sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> {
            return Long.valueOf(sequencer.tryWrite(List.of(TestEntry.ofKey(1L), TestEntry.ofKey(2L))));
        }, l -> {
            return l.longValue() <= 0;
        });
    }

    @Test
    void writingSingleEntryIncreasesPositions() {
        Assertions.assertThat(new Sequencer(1, 1L, 16777216).tryWrite(TestEntry.ofDefaults())).isPositive().isEqualTo(1L);
    }

    @Test
    void writingMultipleEntriesIncreasesPositions() {
        Assertions.assertThat(new Sequencer(1, 1L, 16777216).tryWrite(List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults()))).isPositive().isEqualTo((1 + r0.size()) - 1);
    }

    @Test
    void notifiesReaderWhenRejectingWriteDueToFullQueue() {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        Awaitility.await("sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> {
            return Long.valueOf(sequencer.tryWrite(TestEntry.ofDefaults()));
        }, l -> {
            return l.longValue() <= 0;
        });
        ActorCondition actorCondition = (ActorCondition) Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(actorCondition);
        Assertions.assertThat(sequencer.tryWrite(TestEntry.ofDefaults())).isNegative();
        ((ActorCondition) Mockito.verify(actorCondition)).signal();
    }

    @Test
    void notifiesReaderWhenRejectingBatchWriteDueToFullQueue() {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        Awaitility.await("sequencer rejects writes").pollInSameThread().pollInterval(Duration.ZERO).until(() -> {
            return Long.valueOf(sequencer.tryWrite(TestEntry.ofDefaults()));
        }, l -> {
            return l.longValue() <= 0;
        });
        ActorCondition actorCondition = (ActorCondition) Mockito.mock(ActorCondition.class);
        sequencer.registerConsumer(actorCondition);
        Assertions.assertThat(sequencer.tryWrite(List.of(TestEntry.ofKey(1L), TestEntry.ofKey(2L)))).isNegative();
        ((ActorCondition) Mockito.verify(actorCondition)).signal();
    }

    @Test
    void keepsPositionsWithSingleWriter() throws InterruptedException {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        List<LogAppendEntry> of = List.of(TestEntry.ofKey(1L));
        Thread newReaderThread = newReaderThread(sequencer, 1L, 10000L);
        Thread newWriterThread = newWriterThread(sequencer, 1L, 10000L, of, true);
        newReaderThread.start();
        newWriterThread.start();
        newReaderThread.join(10000L);
        newWriterThread.join(10000L);
    }

    @Test
    void keepsPositionsWithMultipleWriters() throws InterruptedException {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        Thread newReaderThread = newReaderThread(sequencer, 1L, 30000L);
        List of = List.of(TestEntry.ofKey(1L));
        List list = IntStream.range(0, 3).mapToObj(i -> {
            return newWriterThread(sequencer, 1L, 10000L, of, false);
        }).toList();
        newReaderThread.start();
        list.forEach((v0) -> {
            v0.start();
        });
        newReaderThread.join(10000L);
        list.forEach(thread -> {
            try {
                thread.join(10000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    void keepsPositionsWithMultipleWritersWritingMultipleEntries() throws InterruptedException {
        Sequencer sequencer = new Sequencer(1, 1L, 16777216);
        Thread newReaderThread = newReaderThread(sequencer, 1L, 30000L);
        List of = List.of(TestEntry.ofKey(1L), TestEntry.ofKey(1L), TestEntry.ofKey(1L), TestEntry.ofKey(1L));
        List list = IntStream.range(0, 3).mapToObj(i -> {
            return newWriterThread(sequencer, 1L, 10000L, of, false);
        }).toList();
        newReaderThread.start();
        list.forEach((v0) -> {
            v0.start();
        });
        newReaderThread.join(10000L);
        list.forEach(thread -> {
            try {
                thread.join(10000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private Thread newReaderThread(Sequencer sequencer, long j, long j2) {
        return new Thread(() -> {
            long j3 = 0;
            long j4 = j - 1;
            while (j3 < j2) {
                SequencedBatch tryRead = sequencer.tryRead();
                if (tryRead != null) {
                    Assertions.assertThat(tryRead.firstPosition()).isEqualTo(j4 + 1);
                    j4 = (tryRead.firstPosition() + tryRead.entries().size()) - 1;
                    j3++;
                }
            }
        });
    }

    private Thread newWriterThread(Sequencer sequencer, long j, long j2, List<LogAppendEntry> list, boolean z) {
        return new Thread(() -> {
            long j3 = 0;
            long j4 = j - 1;
            while (j3 < j2) {
                long tryWrite = sequencer.tryWrite(list);
                if (tryWrite > 0) {
                    if (z) {
                        Assertions.assertThat(tryWrite).isEqualTo(j4 + list.size());
                    } else {
                        Assertions.assertThat(tryWrite).isGreaterThan(j4);
                    }
                    j4 = tryWrite;
                    j3++;
                } else {
                    LockSupport.parkNanos(1000000L);
                }
            }
        });
    }
}
