package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.flowcontrol.FlowControl;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.logstreams.util.TestEntry;
import io.camunda.zeebe.test.util.asserts.EitherAssert;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.time.InstantSource;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Execution(ExecutionMode.CONCURRENT)
/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/SequencerTest.class */
final class SequencerTest {

    /* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/SequencerTest$VerifyingLogStorage.class */
    private static final class VerifyingLogStorage implements LogStorage {
        private long position = -1;

        private VerifyingLogStorage() {
        }

        public LogStorageReader newReader() {
            throw new UnsupportedOperationException();
        }

        public void append(long j, long j2, BufferWriter bufferWriter, LogStorage.AppendListener appendListener) {
            if (this.position != -1) {
                Assertions.assertThat(j).isEqualTo(this.position + 1);
            }
            this.position = j2;
            appendListener.onCommit(this.position, j2);
        }

        public void addCommitListener(LogStorage.CommitListener commitListener) {
            throw new UnsupportedOperationException();
        }

        public void removeCommitListener(LogStorage.CommitListener commitListener) {
            throw new UnsupportedOperationException();
        }
    }

    SequencerTest() {
    }

    @Test
    void writingSingleEntryIncreasesPositions() {
        EitherAssert.assertThat(new Sequencer((LogStorage) Mockito.mock(LogStorage.class), 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1))).tryWrite(WriteContext.internal(), TestEntry.ofDefaults())).isRight().right().isEqualTo(1L);
    }

    @Test
    void writingMultipleEntriesIncreasesPositions() {
        EitherAssert.assertThat(new Sequencer((LogStorage) Mockito.mock(LogStorage.class), 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1))).tryWrite(WriteContext.internal(), List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults()))).isRight().right().isEqualTo(Long.valueOf((1 + r0.size()) - 1));
    }

    @Test
    void writesSingleEntryToLogStorage() {
        LogStorage logStorage = (LogStorage) Mockito.mock(LogStorage.class);
        new Sequencer(logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1))).tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        ((LogStorage) Mockito.verify(logStorage)).append(ArgumentMatchers.eq(1L), ArgumentMatchers.eq(1L), (BufferWriter) ArgumentMatchers.any(BufferWriter.class), (LogStorage.AppendListener) ArgumentMatchers.any());
    }

    @Test
    void writesMultipleEntriesToLogStorage() {
        LogStorage logStorage = (LogStorage) Mockito.mock(LogStorage.class);
        new Sequencer(logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1))).tryWrite(WriteContext.internal(), List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults()));
        ((LogStorage) Mockito.verify(logStorage)).append(ArgumentMatchers.eq(1L), ArgumentMatchers.eq(3L), (BufferWriter) ArgumentMatchers.any(BufferWriter.class), (LogStorage.AppendListener) ArgumentMatchers.any());
    }

    @Test
    void maintainsPositionWithSingleWriterAndSingleEntry() throws InterruptedException {
        Sequencer sequencer = new Sequencer(new VerifyingLogStorage(), 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1)));
        LogAppendEntry ofDefaults = TestEntry.ofDefaults();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        List<LogAppendEntry> of = List.of(ofDefaults);
        Objects.requireNonNull(concurrentLinkedQueue);
        Thread newWriterThread = newWriterThread(sequencer, 1L, 100000L, of, true, (v1) -> {
            r6.add(v1);
        });
        newWriterThread.start();
        newWriterThread.join();
        Assertions.assertThat(concurrentLinkedQueue).isEmpty();
    }

    @Test
    void maintainsPositionWithMultipleWritersAndSingleEntry() throws InterruptedException {
        Sequencer sequencer = new Sequencer(new VerifyingLogStorage(), 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1)));
        LogAppendEntry ofDefaults = TestEntry.ofDefaults();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Thread[] threadArr = new Thread[8];
        for (int i = 0; i < 8; i++) {
            List<LogAppendEntry> of = List.of(ofDefaults);
            Objects.requireNonNull(concurrentLinkedQueue);
            threadArr[i] = newWriterThread(sequencer, 1L, 100000L, of, false, (v1) -> {
                r8.add(v1);
            });
        }
        for (Thread thread : threadArr) {
            thread.start();
        }
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        Assertions.assertThat(concurrentLinkedQueue).isEmpty();
    }

    @Test
    void maintainsPositionWithSingleWriterAndMultipleEntries() throws InterruptedException {
        Sequencer sequencer = new Sequencer(new VerifyingLogStorage(), 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1)));
        List<LogAppendEntry> of = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Objects.requireNonNull(concurrentLinkedQueue);
        Thread newWriterThread = newWriterThread(sequencer, 1L, 100000L, of, true, (v1) -> {
            r6.add(v1);
        });
        newWriterThread.start();
        newWriterThread.join();
        Assertions.assertThat(concurrentLinkedQueue).isEmpty();
    }

    @Test
    void maintainsPositionWithMultipleWritersAndMultipleEntries() throws InterruptedException {
        Sequencer sequencer = new Sequencer(new VerifyingLogStorage(), 1L, 16, InstantSource.system(), new SequencerMetrics(1), new FlowControl(new LogStreamMetrics(1)));
        List<LogAppendEntry> of = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Thread[] threadArr = new Thread[8];
        for (int i = 0; i < 8; i++) {
            Objects.requireNonNull(concurrentLinkedQueue);
            threadArr[i] = newWriterThread(sequencer, 1L, 100000L, of, false, (v1) -> {
                r8.add(v1);
            });
        }
        for (Thread thread : threadArr) {
            thread.start();
        }
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        Assertions.assertThat(concurrentLinkedQueue).isEmpty();
    }

    private Thread newWriterThread(Sequencer sequencer, long j, long j2, List<LogAppendEntry> list, boolean z, Consumer<Throwable> consumer) {
        Thread thread = new Thread(() -> {
            long j3 = 0;
            long j4 = j - 1;
            while (j3 < j2) {
                Either tryWrite = sequencer.tryWrite(WriteContext.internal(), list);
                if (tryWrite.isRight()) {
                    if (z) {
                        Assertions.assertThat((Long) tryWrite.get()).isEqualTo(j4 + list.size());
                    } else {
                        Assertions.assertThat((Long) tryWrite.get()).isGreaterThan(j4);
                    }
                    j4 = ((Long) tryWrite.get()).longValue();
                    j3++;
                } else {
                    LockSupport.parkNanos(1000000L);
                }
            }
        });
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            consumer.accept(th);
        });
        return thread;
    }
}
