package io.camunda.zeebe.logstreams.log;

import io.camunda.zeebe.logstreams.util.LogStreamRule;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.logstreams.util.TestEntry;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/camunda/zeebe/logstreams/log/LogStreamTest.class */
public final class LogStreamTest {
    public static final int PARTITION_ID = 0;
    private final LogStreamRule logStreamRule = LogStreamRule.startByDefault();

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.logStreamRule);
    private SynchronousLogStream logStream;

    @Before
    public void setup() {
        this.logStream = this.logStreamRule.getLogStream();
    }

    @Test
    public void shouldBuildLogStream() {
        Assertions.assertThat(this.logStream.getPartitionId()).isEqualTo(0);
        Assertions.assertThat(this.logStream.getLogName()).isEqualTo("0");
        Assertions.assertThat(this.logStream.newLogStreamReader()).isNotNull();
        Assertions.assertThat(this.logStream.newLogStreamWriter()).isNotNull();
    }

    @Test
    public void shouldCloseLogStream() {
        this.logStream.close();
        Assertions.assertThatThrownBy(() -> {
            this.logStream.newLogStreamWriter();
        }).hasMessage("Actor is closed");
    }

    @Test
    public void shouldIncreasePositionOnRestart() {
        SynchronousLogStream.SynchronousLogStreamWriter newSyncLogStreamWriter = this.logStream.newSyncLogStreamWriter();
        newSyncLogStreamWriter.tryWrite(TestEntry.ofDefaults());
        newSyncLogStreamWriter.tryWrite(TestEntry.ofDefaults());
        newSyncLogStreamWriter.tryWrite(TestEntry.ofDefaults());
        long longValue = ((Long) newSyncLogStreamWriter.tryWrite(TestEntry.ofDefaults()).get()).longValue();
        ConditionFactory await = Awaitility.await("until everything is written");
        SynchronousLogStream synchronousLogStream = this.logStream;
        Objects.requireNonNull(synchronousLogStream);
        await.until(synchronousLogStream::getLastWrittenPosition, l -> {
            return l.longValue() >= longValue;
        });
        this.logStream.close();
        this.logStreamRule.createLogStream();
        Assertions.assertThat(((Long) this.logStreamRule.getLogStream().newLogStreamWriter().tryWrite(TestEntry.ofDefaults()).get()).longValue()).isGreaterThan(longValue);
    }

    @Test
    public void shouldNotifyWhenNewRecordsAreAvailable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LogStream asyncLogStream = this.logStream.getAsyncLogStream();
        Objects.requireNonNull(countDownLatch);
        asyncLogStream.registerRecordAvailableListener(countDownLatch::countDown);
        this.logStreamRule.getLogStreamWriter().tryWrite(TestEntry.ofDefaults());
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldNotifyMultipleListenersWhenNewRecordsAreAvailable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LogStream asyncLogStream = this.logStream.getAsyncLogStream();
        Objects.requireNonNull(countDownLatch);
        asyncLogStream.registerRecordAvailableListener(countDownLatch::countDown);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        LogStream asyncLogStream2 = this.logStream.getAsyncLogStream();
        Objects.requireNonNull(countDownLatch2);
        asyncLogStream2.registerRecordAvailableListener(countDownLatch2::countDown);
        this.logStreamRule.getLogStreamWriter().tryWrite(TestEntry.ofDefaults());
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(countDownLatch2.await(2L, TimeUnit.SECONDS)).isTrue();
    }
}
