/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.flowcontrol.FlowControl;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.impl.log.SequencerMetrics;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.logstreams.util.TestEntry;
import io.camunda.zeebe.test.util.asserts.EitherAssert;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.InstantSource;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Execution(value=ExecutionMode.CONCURRENT)
final class SequencerTest {
    SequencerTest() {
    }

    @Test
    void writingSingleEntryIncreasesPositions() {
        long initialPosition = 1L;
        LogStorage logStorage = (LogStorage)Mockito.mock(LogStorage.class);
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer(logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        Either result = sequencer.tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        EitherAssert.assertThat((Either)result).isRight().right().isEqualTo((Object)1L);
    }

    @Test
    void writingMultipleEntriesIncreasesPositions() {
        long initialPosition = 1L;
        LogStorage logStorage = (LogStorage)Mockito.mock(LogStorage.class);
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer(logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        List<LogAppendEntry> entries = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        Either result = sequencer.tryWrite(WriteContext.internal(), entries);
        EitherAssert.assertThat((Either)result).isRight().right().isEqualTo((Object)(1L + (long)entries.size() - 1L));
    }

    @Test
    void writesSingleEntryToLogStorage() {
        LogStorage logStorage = (LogStorage)Mockito.mock(LogStorage.class);
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer(logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        LogAppendEntry entry = TestEntry.ofDefaults();
        sequencer.tryWrite(WriteContext.internal(), entry);
        ((LogStorage)Mockito.verify((Object)logStorage)).append(ArgumentMatchers.eq((long)1L), ArgumentMatchers.eq((long)1L), (BufferWriter)ArgumentMatchers.any(BufferWriter.class), (LogStorage.AppendListener)ArgumentMatchers.any());
    }

    @Test
    void writesMultipleEntriesToLogStorage() {
        LogStorage logStorage = (LogStorage)Mockito.mock(LogStorage.class);
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer(logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        List<LogAppendEntry> entries = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        sequencer.tryWrite(WriteContext.internal(), entries);
        ((LogStorage)Mockito.verify((Object)logStorage)).append(ArgumentMatchers.eq((long)1L), ArgumentMatchers.eq((long)3L), (BufferWriter)ArgumentMatchers.any(BufferWriter.class), (LogStorage.AppendListener)ArgumentMatchers.any());
    }

    @Test
    void maintainsPositionWithSingleWriterAndSingleEntry() throws InterruptedException {
        VerifyingLogStorage logStorage = new VerifyingLogStorage();
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer((LogStorage)logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        LogAppendEntry entry = TestEntry.ofDefaults();
        ConcurrentLinkedQueue testFailures = new ConcurrentLinkedQueue();
        Thread writer = this.newWriterThread(sequencer, 1L, 100000L, List.of(entry), true, testFailures::add);
        writer.start();
        writer.join();
        Assertions.assertThat(testFailures).isEmpty();
    }

    @Test
    void maintainsPositionWithMultipleWritersAndSingleEntry() throws InterruptedException {
        int numberOfWriters = 8;
        VerifyingLogStorage logStorage = new VerifyingLogStorage();
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer((LogStorage)logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        LogAppendEntry entry = TestEntry.ofDefaults();
        ConcurrentLinkedQueue testFailures = new ConcurrentLinkedQueue();
        Thread[] writers = new Thread[8];
        for (int i = 0; i < 8; ++i) {
            writers[i] = this.newWriterThread(sequencer, 1L, 100000L, List.of(entry), false, testFailures::add);
        }
        for (Thread writer : writers) {
            writer.start();
        }
        for (Thread writer : writers) {
            writer.join();
        }
        Assertions.assertThat(testFailures).isEmpty();
    }

    @Test
    void maintainsPositionWithSingleWriterAndMultipleEntries() throws InterruptedException {
        VerifyingLogStorage logStorage = new VerifyingLogStorage();
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer((LogStorage)logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        List<LogAppendEntry> entries = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        ConcurrentLinkedQueue testFailures = new ConcurrentLinkedQueue();
        Thread writer = this.newWriterThread(sequencer, 1L, 100000L, entries, true, testFailures::add);
        writer.start();
        writer.join();
        Assertions.assertThat(testFailures).isEmpty();
    }

    @Test
    void maintainsPositionWithMultipleWritersAndMultipleEntries() throws InterruptedException {
        int numberOfWriters = 8;
        VerifyingLogStorage logStorage = new VerifyingLogStorage();
        LogStreamMetrics logStreamMetrics = new LogStreamMetrics((MeterRegistry)new SimpleMeterRegistry());
        Sequencer sequencer = new Sequencer((LogStorage)logStorage, 1L, 16, InstantSource.system(), new SequencerMetrics((MeterRegistry)new SimpleMeterRegistry()), new FlowControl(logStreamMetrics));
        List<LogAppendEntry> entries = List.of(TestEntry.ofDefaults(), TestEntry.ofDefaults(), TestEntry.ofDefaults());
        ConcurrentLinkedQueue testFailures = new ConcurrentLinkedQueue();
        Thread[] writers = new Thread[8];
        for (int i = 0; i < 8; ++i) {
            writers[i] = this.newWriterThread(sequencer, 1L, 100000L, entries, false, testFailures::add);
        }
        for (Thread writer : writers) {
            writer.start();
        }
        for (Thread writer : writers) {
            writer.join();
        }
        Assertions.assertThat(testFailures).isEmpty();
    }

    private Thread newWriterThread(Sequencer sequencer, long initialPosition, long batchesToWrite, List<LogAppendEntry> batchToWrite, boolean isOnlyWriter, Consumer<Throwable> failedAssertionHandler) {
        Thread thread = new Thread(() -> {
            long batchesWritten = 0L;
            long lastWrittenPosition = initialPosition - 1L;
            while (batchesWritten < batchesToWrite) {
                Either result = sequencer.tryWrite(WriteContext.internal(), batchToWrite);
                if (result.isRight()) {
                    if (isOnlyWriter) {
                        Assertions.assertThat((Long)((Long)result.get())).isEqualTo(lastWrittenPosition + (long)batchToWrite.size());
                    } else {
                        Assertions.assertThat((Long)((Long)result.get())).isGreaterThan(lastWrittenPosition);
                    }
                    lastWrittenPosition = (Long)result.get();
                    ++batchesWritten;
                    continue;
                }
                LockSupport.parkNanos(1000000L);
            }
        });
        thread.setUncaughtExceptionHandler((t, e) -> failedAssertionHandler.accept(e));
        return thread;
    }

    private static final class VerifyingLogStorage
    implements LogStorage {
        private long position = -1L;

        private VerifyingLogStorage() {
        }

        public LogStorageReader newReader() {
            throw new UnsupportedOperationException();
        }

        public void append(long lowestPosition, long highestPosition, BufferWriter bufferWriter, LogStorage.AppendListener listener) {
            if (this.position != -1L) {
                Assertions.assertThat((long)lowestPosition).isEqualTo(this.position + 1L);
            }
            this.position = highestPosition;
            listener.onCommit(this.position, highestPosition);
        }

        public void addCommitListener(LogStorage.CommitListener listener) {
            throw new UnsupportedOperationException();
        }

        public void removeCommitListener(LogStorage.CommitListener listener) {
            throw new UnsupportedOperationException();
        }
    }
}

