package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.dispatcher.BlockPeek;
import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.dispatcher.Dispatchers;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.dispatcher.impl.log.DataFrameDescriptor;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.util.ByteValue;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.sched.ActorScheduler;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.agrona.CloseHelper;
import org.agrona.MutableDirectBuffer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Execution(ExecutionMode.CONCURRENT)
/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppenderTest.class */
final class LogStorageAppenderTest {
    private static final int MAX_FRAGMENT_SIZE = 1024;
    private static final int PARTITION_ID = 0;
    private static final long INITIAL_POSITION = 2;
    private static final long WRONG_POSITION = 10;
    private final ActorScheduler scheduler = ActorScheduler.newActorScheduler().setCpuBoundActorThreadCount(1).setIoBoundActorThreadCount(1).build();
    private final ListLogStorage logStorage = (ListLogStorage) Mockito.spy(new ListLogStorage());
    private Dispatcher dispatcher;
    private Subscription subscription;
    private LogStorageAppender appender;
    private LogStreamBatchWriter writer;
    private LogStreamReader reader;

    /* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppenderTest$Value.class */
    private static final class Value extends Record implements BufferWriter {
        private final int value;

        private Value(int i) {
            this.value = i;
        }

        private static Value of(LoggedEvent loggedEvent) {
            return new Value(loggedEvent.getValueBuffer().getInt(loggedEvent.getValueOffset(), Protocol.ENDIANNESS));
        }

        public int getLength() {
            return 4;
        }

        public void write(MutableDirectBuffer mutableDirectBuffer, int i) {
            mutableDirectBuffer.putInt(i, this.value, Protocol.ENDIANNESS);
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Value.class), Value.class, "value", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppenderTest$Value;->value:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Value.class), Value.class, "value", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppenderTest$Value;->value:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Value.class, Object.class), Value.class, "value", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppenderTest$Value;->value:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public int value() {
            return this.value;
        }
    }

    LogStorageAppenderTest() {
    }

    @BeforeEach
    void beforeEach() {
        this.scheduler.start();
        this.dispatcher = Dispatchers.create("0").actorSchedulingService(this.scheduler).bufferSize((int) ByteValue.ofMegabytes(102400L)).maxFragmentLength(MAX_FRAGMENT_SIZE).initialPosition(INITIAL_POSITION).build();
        this.subscription = (Subscription) Mockito.spy(this.dispatcher.openSubscription("log"));
        Mockito.reset(new Object[]{this.subscription, this.logStorage});
        this.appender = new LogStorageAppender("appender", 0, this.logStorage, this.subscription, MAX_FRAGMENT_SIZE);
        this.writer = new LogStreamBatchWriterImpl(0, this.dispatcher);
        this.reader = new LogStreamReaderImpl(this.logStorage.newReader());
    }

    @AfterEach
    public void tearDown() {
        CloseHelper.quietCloseAll(new AutoCloseable[]{this.appender, this.dispatcher, this.scheduler});
    }

    @Test
    void shouldAppendSingleEvent() throws InterruptedException {
        Value value = new Value(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        long tryWrite = this.writer.event().valueWriter(value).done().tryWrite();
        this.logStorage.setPositionListener(j -> {
            countDownLatch.countDown();
        });
        this.scheduler.submitActor(this.appender).join();
        ((AbstractBooleanAssert) Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).as("value was written within 5 seconds", new Object[0])).isTrue();
        Assertions.assertThat(this.reader.seek(tryWrite)).isTrue();
        Assertions.assertThat(this.reader.hasNext()).isTrue();
        Assertions.assertThat(Value.of((LoggedEvent) this.reader.next())).isEqualTo(value);
    }

    @Test
    void shouldAppendMultipleEvents() throws InterruptedException {
        List<Value> of = List.of(new Value(1), new Value(2));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        long tryWrite = this.writer.event().valueWriter((BufferWriter) of.get(0)).done().event().valueWriter((BufferWriter) of.get(1)).done().tryWrite();
        long j = tryWrite - 1;
        this.logStorage.setPositionListener(j2 -> {
            countDownLatch.countDown();
        });
        this.scheduler.submitActor(this.appender).join();
        ((AbstractBooleanAssert) Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).as("value was written within 5 seconds", new Object[0])).isTrue();
        ((ListLogStorage) Mockito.verify(this.logStorage, Mockito.timeout(1000L).times(1))).append(ArgumentMatchers.eq(j), ArgumentMatchers.eq(tryWrite), (ByteBuffer) ArgumentMatchers.any(ByteBuffer.class), (LogStorage.AppendListener) ArgumentMatchers.any(LogStorage.AppendListener.class));
        Assertions.assertThat(this.reader.seek(j)).isTrue();
        for (Value value : of) {
            Assertions.assertThat(this.reader.hasNext()).isTrue();
            Assertions.assertThat(Value.of((LoggedEvent) this.reader.next())).isEqualTo(value);
        }
    }

    @Test
    void shouldFailActorWhenDetectingGapsInPositions() {
        Value value = new Value(1);
        Mockito.when(Integer.valueOf(this.subscription.peekBlock((BlockPeek) ArgumentMatchers.any(BlockPeek.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean()))).thenAnswer(invocationOnMock -> {
            int intValue = ((Integer) invocationOnMock.callRealMethod()).intValue();
            if (intValue <= 0) {
                return Integer.valueOf(intValue);
            }
            BlockPeek blockPeek = (BlockPeek) invocationOnMock.getArgument(0);
            LoggedEventImpl loggedEventImpl = new LoggedEventImpl();
            loggedEventImpl.wrap(blockPeek.getBuffer(), 0);
            LogEntryDescriptor.setPosition(blockPeek.getBuffer(), DataFrameDescriptor.messageOffset(loggedEventImpl.getLength()), WRONG_POSITION);
            return Integer.valueOf(intValue);
        });
        this.scheduler.submitActor(this.appender).join();
        this.writer.event().valueWriter(value).done().event().valueWriter(value).done().tryWrite();
        ConditionFactory atMost = Awaitility.await("until the actor has failed").atMost(Duration.ofSeconds(WRONG_POSITION));
        LogStorageAppender logStorageAppender = this.appender;
        Objects.requireNonNull(logStorageAppender);
        atMost.until(logStorageAppender::isActorClosed);
        this.reader.seekToFirstEvent();
        Assertions.assertThat(this.reader.hasNext()).isFalse();
    }
}
