package io.camunda.zeebe.logstreams.log;

import io.camunda.zeebe.logstreams.impl.log.LoggedEventImpl;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.util.LogStreamReaderRule;
import io.camunda.zeebe.logstreams.util.LogStreamRule;
import io.camunda.zeebe.logstreams.util.TestEntry;
import io.camunda.zeebe.test.util.asserts.EitherAssert;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.ArrayList;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/camunda/zeebe/logstreams/log/LogStreamWriterTest.class */
public final class LogStreamWriterTest {
    private final LogStreamRule logStreamRule = LogStreamRule.startByDefault();
    private final LogStreamReaderRule readerRule = new LogStreamReaderRule(this.logStreamRule);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.logStreamRule).around(this.readerRule);
    private LogStreamWriter writer;

    @Before
    public void setUp() {
        this.writer = this.logStreamRule.getLogStream().newLogStreamWriter();
    }

    @After
    public void tearDown() {
        this.writer = null;
    }

    @Test
    public void shouldFailToWriteBatchWithoutEvents() {
        EitherAssert.assertThat(this.writer.tryWrite(WriteContext.internal(), List.of())).isLeft().left().isEqualTo(LogStreamWriter.WriteFailure.INVALID_ARGUMENT);
    }

    @Test
    public void shouldReturnPositionOfWrittenEvent() {
        long tryWrite = tryWrite(TestEntry.ofDefaults());
        Assertions.assertThat(tryWrite).isGreaterThan(0L);
        Assertions.assertThat(getWrittenEvent(tryWrite).getPosition()).isEqualTo(tryWrite);
    }

    @Test
    public void shouldReturnPositionOfLastEvent() {
        long longValue = ((Long) this.writer.tryWrite(WriteContext.internal(), List.of(TestEntry.ofKey(1L), TestEntry.ofKey(2L))).get()).longValue();
        Assertions.assertThat(longValue).isGreaterThan(0L);
        List<LoggedEvent> writtenEvents = getWrittenEvents(longValue);
        Assertions.assertThat(writtenEvents).hasSize(2);
        Assertions.assertThat(writtenEvents.get(1).getPosition()).isEqualTo(longValue);
    }

    @Test
    public void shouldWriteEventWithValueBuffer() {
        LogAppendEntry ofDefaults = TestEntry.ofDefaults();
        TestEntry.TestEntryAssert.assertThatEntry(ofDefaults).matchesLoggedEvent(getWrittenEvent(tryWrite(ofDefaults)));
    }

    @Test
    public void shouldWriteEventWithMetadataBuffer() {
        LogAppendEntry ofDefaults = TestEntry.ofDefaults();
        TestEntry.TestEntryAssert.assertThatEntry(ofDefaults).matchesLoggedEvent(getWrittenEvent(tryWrite(ofDefaults)));
    }

    @Test
    public void shouldWriteEventWithMetadataWriter() {
        LogAppendEntry ofDefaults = TestEntry.ofDefaults();
        TestEntry.TestEntryAssert.assertThatEntry(ofDefaults).matchesLoggedEvent(getWrittenEvent(tryWrite(ofDefaults)));
    }

    @Test
    public void shouldWriteEventWithKey() {
        Assertions.assertThat(getWrittenEvent(tryWrite(TestEntry.ofKey(123L))).getKey()).isEqualTo(123L);
    }

    @Test
    public void shouldWriteEventsWithDifferentWriters() {
        long tryWrite = tryWrite(TestEntry.ofKey(123L));
        this.writer = this.logStreamRule.getLogStream().newLogStreamWriter();
        long tryWrite2 = tryWrite(TestEntry.ofKey(124L));
        Assertions.assertThat(tryWrite2).isGreaterThan(tryWrite);
        Assertions.assertThat(getWrittenEvent(tryWrite).getKey()).isEqualTo(123L);
        Assertions.assertThat(getWrittenEvent(tryWrite2).getKey()).isEqualTo(124L);
    }

    @Test
    public void shouldCloseAllWritersAndWriteAgain() {
        long tryWrite = tryWrite(TestEntry.ofKey(123L));
        this.logStreamRule.getLogStream().awaitPositionWritten(tryWrite);
        this.writer = null;
        this.writer = this.logStreamRule.getLogStream().newLogStreamWriter();
        long tryWrite2 = tryWrite(TestEntry.ofKey(124L));
        Assertions.assertThat(tryWrite2).isGreaterThan(tryWrite);
        Assertions.assertThat(getWrittenEvent(tryWrite).getKey()).isEqualTo(123L);
        Assertions.assertThat(getWrittenEvent(tryWrite2).getKey()).isEqualTo(124L);
    }

    @Test
    public void shouldWriteEventWithSourceEvent() {
        Assertions.assertThat(getWrittenEvent(tryWrite(TestEntry.ofDefaults(), 123L)).getSourceEventPosition()).isEqualTo(123L);
    }

    @Test
    public void shouldWriteEventWithoutSourceEvent() {
        Assertions.assertThat(getWrittenEvent(tryWrite(TestEntry.ofDefaults())).getSourceEventPosition()).isEqualTo(-1L);
    }

    @Test
    public void shouldWriteEventWithNullKey() {
        Assertions.assertThat(getWrittenEvent(tryWrite(TestEntry.ofKey(-1L))).getKey()).isEqualTo(-1L);
    }

    @Test
    public void shouldWriteNullKeyByDefault() {
        Assertions.assertThat(getWrittenEvent(tryWrite(TestEntry.ofDefaults())).getKey()).isEqualTo(-1L);
    }

    @Test
    public void shouldFailToWriteEventWithoutValue() {
        EitherAssert.assertThat(this.writer.tryWrite(WriteContext.internal(), TestEntry.builder().withRecordValue(null).build())).isLeft();
    }

    private LoggedEvent getWrittenEvent(long j) {
        Assertions.assertThat(j).isGreaterThan(0L);
        this.logStreamRule.getLogStream().awaitPositionWritten(j);
        LoggedEvent readEventAtPosition = this.readerRule.readEventAtPosition(j);
        Assertions.assertThat(readEventAtPosition).withFailMessage("No written event found at position: <%s>", new Object[]{Long.valueOf(j)}).isNotNull();
        return readEventAtPosition;
    }

    private List<LoggedEvent> getWrittenEvents(long j) {
        ArrayList arrayList = new ArrayList();
        Assertions.assertThat(j).isGreaterThan(0L);
        this.logStreamRule.getLogStream().awaitPositionWritten(j);
        long j2 = -1;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                Assertions.assertThat(j3).withFailMessage("No written event found at position: {}", new Object[]{Long.valueOf(j)}).isEqualTo(j);
                return arrayList;
            }
            LoggedEventImpl nextEvent = this.readerRule.nextEvent();
            LoggedEventImpl loggedEventImpl = new LoggedEventImpl();
            loggedEventImpl.wrap(BufferUtil.cloneBuffer(nextEvent.getBuffer()), nextEvent.getFragmentOffset());
            arrayList.add(loggedEventImpl);
            j2 = nextEvent.getPosition();
        }
    }

    private long tryWrite(LogAppendEntry logAppendEntry) {
        return ((Long) ((Either) Awaitility.await("until dispatcher accepts entry").pollInSameThread().until(() -> {
            return this.writer.tryWrite(WriteContext.internal(), logAppendEntry);
        }, (v0) -> {
            return v0.isRight();
        })).get()).longValue();
    }

    private long tryWrite(LogAppendEntry logAppendEntry, long j) {
        return ((Long) ((Either) Awaitility.await("until dispatcher accepts entry").pollInSameThread().until(() -> {
            return this.writer.tryWrite(WriteContext.internal(), logAppendEntry, j);
        }, (v0) -> {
            return v0.isRight();
        })).get()).longValue();
    }
}
