/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.log;

import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.logstreams.util.LogStreamRule;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.logstreams.util.TestEntry;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

public final class LogStreamTest {
    public static final int PARTITION_ID = 0;
    private final LogStreamRule logStreamRule = LogStreamRule.startByDefault();
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.logStreamRule);
    private SynchronousLogStream logStream;

    @Before
    public void setup() {
        this.logStream = this.logStreamRule.getLogStream();
    }

    @Test
    public void shouldBuildLogStream() {
        Assertions.assertThat((int)this.logStream.getPartitionId()).isEqualTo(0);
        Assertions.assertThat((String)this.logStream.getLogName()).isEqualTo("logStream-0");
        Assertions.assertThat((Iterator)this.logStream.newLogStreamReader()).isNotNull();
        Assertions.assertThat((Object)this.logStream.newLogStreamWriter()).isNotNull();
    }

    @Test
    public void shouldCloseLogStream() {
        this.logStream.close();
        Assertions.assertThatThrownBy(() -> this.logStream.newLogStreamWriter()).hasMessage("logStream-0 is closed");
    }

    @Test
    public void shouldIncreasePositionOnRestart() {
        SynchronousLogStream.SynchronousLogStreamWriter writer = this.logStream.newSyncLogStreamWriter();
        writer.tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        writer.tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        writer.tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        long positionBeforeClose = (Long)writer.tryWrite(WriteContext.internal(), TestEntry.ofDefaults()).get();
        Awaitility.await((String)"until everything is written").until(this.logStream::getLastWrittenPosition, position -> position >= positionBeforeClose);
        this.logStream.close();
        this.logStreamRule.createLogStream();
        LogStreamWriter newWriter = this.logStreamRule.getLogStream().newLogStreamWriter();
        long positionAfterReOpen = (Long)newWriter.tryWrite(WriteContext.internal(), TestEntry.ofDefaults()).get();
        Assertions.assertThat((long)positionAfterReOpen).isGreaterThan(positionBeforeClose);
    }

    @Test
    public void shouldNotifyWhenNewRecordsAreAvailable() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        this.logStream.getAsyncLogStream().registerRecordAvailableListener(latch::countDown);
        this.logStreamRule.getLogStreamWriter().tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        Assertions.assertThat((boolean)latch.await(2L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldNotifyMultipleListenersWhenNewRecordsAreAvailable() throws InterruptedException {
        CountDownLatch firstListener = new CountDownLatch(1);
        this.logStream.getAsyncLogStream().registerRecordAvailableListener(firstListener::countDown);
        CountDownLatch secondListener = new CountDownLatch(1);
        this.logStream.getAsyncLogStream().registerRecordAvailableListener(secondListener::countDown);
        this.logStreamRule.getLogStreamWriter().tryWrite(WriteContext.internal(), TestEntry.ofDefaults());
        Assertions.assertThat((boolean)firstListener.await(2L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat((boolean)secondListener.await(2L, TimeUnit.SECONDS)).isTrue();
    }
}

