package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.dispatcher.Dispatchers;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.ByteValue;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppenderHealthTest.class */
public final class LogStorageAppenderHealthTest {
    private static final int MAX_FRAGMENT_SIZE = 1024;
    private static final int PARTITION_ID = 0;

    @Rule
    public final ActorSchedulerRule schedulerRule = new ActorSchedulerRule();
    private Dispatcher dispatcher;
    private ControllableLogStorage failingLogStorage;
    private LogStorageAppender appender;
    private LogStreamWriterImpl writer;

    /* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppenderHealthTest$ControllableLogStorage.class */
    private class ControllableLogStorage extends Actor implements LogStorage {
        private BiConsumer<Long, LogStorage.AppendListener> onAppend = (l, appendListener) -> {
            appendListener.onWrite(l.longValue());
        };

        public ControllableLogStorage() {
            LogStorageAppenderHealthTest.this.schedulerRule.submitActor(this).join();
        }

        void onNextAppend(BiConsumer<Long, LogStorage.AppendListener> biConsumer) {
            this.onAppend = biConsumer;
        }

        public LogStorageReader newReader() {
            return null;
        }

        public void append(long j, long j2, ByteBuffer byteBuffer, LogStorage.AppendListener appendListener) {
            this.actor.run(() -> {
                this.onAppend.accept(Long.valueOf(j2), appendListener);
            });
        }

        public void close() {
            this.actor.close();
        }
    }

    @Before
    public void setUp() {
        this.failingLogStorage = new ControllableLogStorage();
        this.dispatcher = Dispatchers.create("0").actorScheduler(this.schedulerRule.get()).bufferSize((int) ByteValue.ofMegabytes(102400L)).maxFragmentLength(MAX_FRAGMENT_SIZE).build();
        this.appender = new LogStorageAppender("appender", 0, this.failingLogStorage, this.dispatcher.openSubscription("log"), MAX_FRAGMENT_SIZE, j -> {
        });
        this.writer = new LogStreamWriterImpl(0, this.dispatcher);
    }

    @After
    public void tearDown() {
        this.appender.close();
        this.dispatcher.close();
    }

    @Test
    public void shouldFailActorWhenWriteFails() {
        this.failingLogStorage.onNextAppend((l, appendListener) -> {
            appendListener.onWriteError(new RuntimeException("foo"));
        });
        this.writer.value(BufferUtil.wrapString("value")).tryWrite();
        this.schedulerRule.submitActor(this.appender).join();
        TestUtil.waitUntil(() -> {
            return this.appender.getHealthStatus() == HealthStatus.UNHEALTHY;
        });
    }

    @Test
    public void shouldFailActorWhenCommitFails() {
        this.failingLogStorage.onNextAppend((l, appendListener) -> {
            appendListener.onCommitError(l.longValue(), new RuntimeException("foo"));
        });
        this.writer.value(BufferUtil.wrapString("value")).tryWrite();
        this.schedulerRule.submitActor(this.appender).join();
        TestUtil.waitUntil(() -> {
            return this.appender.getHealthStatus() == HealthStatus.UNHEALTHY;
        });
    }

    @Test
    public void shouldBeHealthyWhenNoExceptions() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.failingLogStorage.onNextAppend((l, appendListener) -> {
            appendListener.onWrite(l.longValue());
            countDownLatch.countDown();
        });
        this.writer.value(BufferUtil.wrapString("value")).tryWrite();
        this.schedulerRule.submitActor(this.appender).join();
        countDownLatch.await();
        Assertions.assertThat(countDownLatch.getCount()).isZero();
        Assertions.assertThat(this.appender.getHealthStatus()).isEqualTo(HealthStatus.HEALTHY);
    }
}
