/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.log.LogStorageAppender;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.impl.log.SequencerMetrics;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.logstreams.util.TestEntry;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.health.HealthStatus;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public final class LogStorageAppenderHealthTest {
    private static final int PARTITION_ID = 0;
    @Rule
    public final ActorSchedulerRule schedulerRule = new ActorSchedulerRule();
    private Sequencer sequencer;
    private ControllableLogStorage failingLogStorage;
    private LogStorageAppender appender;

    @Before
    public void setUp() {
        SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
        this.failingLogStorage = new ControllableLogStorage(this);
        this.sequencer = new Sequencer(0L, 0x400000, new SequencerMetrics((MeterRegistry)meterRegistry));
        this.appender = new LogStorageAppender("appender", 0, (LogStorage)this.failingLogStorage, this.sequencer, (MeterRegistry)meterRegistry);
    }

    @After
    public void tearDown() {
        this.sequencer.close();
        this.appender.close();
    }

    @Test
    public void shouldFailActorWhenWriteFails() {
        this.failingLogStorage.onNextAppend((pos, listener) -> listener.onWriteError((Throwable)new RuntimeException("foo")));
        this.sequencer.tryWrite(TestEntry.ofDefaults());
        this.schedulerRule.submitActor((Actor)this.appender).join();
        TestUtil.waitUntil(() -> this.appender.getHealthReport().isUnhealthy());
    }

    @Test
    public void shouldFailActorWhenCommitFails() {
        this.failingLogStorage.onNextAppend((pos, listener) -> listener.onCommitError(pos.longValue(), (Throwable)new RuntimeException("foo")));
        this.sequencer.tryWrite(TestEntry.ofDefaults());
        this.schedulerRule.submitActor((Actor)this.appender).join();
        TestUtil.waitUntil(() -> this.appender.getHealthReport().isUnhealthy());
    }

    @Test
    public void shouldBeHealthyWhenNoExceptions() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        this.failingLogStorage.onNextAppend((pos, listener) -> {
            listener.onWrite(pos.longValue());
            latch.countDown();
        });
        this.sequencer.tryWrite(TestEntry.ofDefaults());
        this.schedulerRule.submitActor((Actor)this.appender).join();
        latch.await();
        Assertions.assertThat((long)latch.getCount()).isZero();
        Assertions.assertThat((Comparable)this.appender.getHealthReport().getStatus()).isEqualTo((Object)HealthStatus.HEALTHY);
    }

    private class ControllableLogStorage
    extends Actor
    implements LogStorage {
        private BiConsumer<Long, LogStorage.AppendListener> onAppend = (pos, listener) -> listener.onWrite(pos.longValue());

        public ControllableLogStorage(LogStorageAppenderHealthTest logStorageAppenderHealthTest) {
            logStorageAppenderHealthTest.schedulerRule.submitActor((Actor)this).join();
        }

        void onNextAppend(BiConsumer<Long, LogStorage.AppendListener> onAppend) {
            this.onAppend = onAppend;
        }

        public LogStorageReader newReader() {
            return null;
        }

        public void append(long lowestPosition, long highestPosition, BufferWriter blockBuffer, LogStorage.AppendListener listener) {
            this.actor.run(() -> this.onAppend.accept(highestPosition, listener));
        }

        public void addCommitListener(LogStorage.CommitListener listener) {
            throw new UnsupportedOperationException("Not implemented");
        }

        public void removeCommitListener(LogStorage.CommitListener listener) {
            throw new UnsupportedOperationException("Not implemented");
        }

        public void close() {
            this.actor.close();
        }
    }
}

