package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppendErrorHandler;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderFlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderMetrics;
import io.camunda.zeebe.logstreams.impl.flowcontrol.InFlightAppend;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppender.class */
final class LogStorageAppender extends Actor implements HealthMonitorable, AppendErrorHandler {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final String name;
    private final AppenderFlowControl flowControl;
    private final Sequencer sequencer;
    private final LogStorage logStorage;
    private final Set<FailureListener> failureListeners = new HashSet();
    private final ActorFuture<Void> closeFuture = new CompletableActorFuture();
    private final AppenderMetrics metrics;
    private final int partitionId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener.class */
    public static final class InstrumentedAppendListener extends Record implements LogStorage.AppendListener {
        private final LogStorage.AppendListener delegate;
        private final SequencedBatch batch;
        private final AppenderMetrics metrics;

        private InstrumentedAppendListener(LogStorage.AppendListener appendListener, SequencedBatch sequencedBatch, AppenderMetrics appenderMetrics) {
            this.delegate = appendListener;
            this.batch = sequencedBatch;
            this.metrics = appenderMetrics;
        }

        @Override // io.camunda.zeebe.logstreams.storage.LogStorage.AppendListener
        public void onWrite(long j) {
            this.delegate.onWrite(j);
            this.batch.entries().forEach(this::recordWrite);
        }

        @Override // io.camunda.zeebe.logstreams.storage.LogStorage.AppendListener
        public void onWriteError(Throwable th) {
            this.delegate.onWriteError(th);
        }

        @Override // io.camunda.zeebe.logstreams.storage.LogStorage.AppendListener
        public void onCommit(long j) {
            this.delegate.onCommit(j);
        }

        @Override // io.camunda.zeebe.logstreams.storage.LogStorage.AppendListener
        public void onCommitError(long j, Throwable th) {
            this.delegate.onCommitError(j, th);
        }

        private void recordWrite(LogAppendEntry logAppendEntry) {
            RecordMetadata recordMetadata = logAppendEntry.recordMetadata();
            this.metrics.recordAppendedEntry(1, recordMetadata.getRecordType(), recordMetadata.getValueType(), recordMetadata.getIntent());
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, InstrumentedAppendListener.class), InstrumentedAppendListener.class, "delegate;batch;metrics", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->delegate:Lio/camunda/zeebe/logstreams/storage/LogStorage$AppendListener;", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->batch:Lio/camunda/zeebe/logstreams/impl/log/SequencedBatch;", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->metrics:Lio/camunda/zeebe/logstreams/impl/flowcontrol/AppenderMetrics;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, InstrumentedAppendListener.class), InstrumentedAppendListener.class, "delegate;batch;metrics", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->delegate:Lio/camunda/zeebe/logstreams/storage/LogStorage$AppendListener;", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->batch:Lio/camunda/zeebe/logstreams/impl/log/SequencedBatch;", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->metrics:Lio/camunda/zeebe/logstreams/impl/flowcontrol/AppenderMetrics;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, InstrumentedAppendListener.class, Object.class), InstrumentedAppendListener.class, "delegate;batch;metrics", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->delegate:Lio/camunda/zeebe/logstreams/storage/LogStorage$AppendListener;", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->batch:Lio/camunda/zeebe/logstreams/impl/log/SequencedBatch;", "FIELD:Lio/camunda/zeebe/logstreams/impl/log/LogStorageAppender$InstrumentedAppendListener;->metrics:Lio/camunda/zeebe/logstreams/impl/flowcontrol/AppenderMetrics;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public LogStorage.AppendListener delegate() {
            return this.delegate;
        }

        public SequencedBatch batch() {
            return this.batch;
        }

        public AppenderMetrics metrics() {
            return this.metrics;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogStorageAppender(String str, int i, LogStorage logStorage, Sequencer sequencer) {
        this.name = str;
        this.partitionId = i;
        this.logStorage = logStorage;
        this.sequencer = sequencer;
        this.metrics = new AppenderMetrics(i);
        this.flowControl = new AppenderFlowControl(this, this.metrics);
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.sequencer.registerConsumer(this.actor.onCondition("sequencer", this::tryWriteBatch));
        this.actor.submit(this::tryWriteBatch);
    }

    protected void onActorClosed() {
        this.closeFuture.complete((Object) null);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        super.closeAsync();
        return this.closeFuture;
    }

    protected void handleFailure(Throwable th) {
        onFailure(th);
    }

    public void onActorFailed() {
        this.closeFuture.complete((Object) null);
    }

    public HealthReport getHealthReport() {
        return this.actor.isClosed() ? HealthReport.unhealthy(this).withMessage("actor is closed") : HealthReport.healthy(this);
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.remove(failureListener);
        });
    }

    private void tryWriteBatch() {
        Optional<InFlightAppend> tryAcquire = this.flowControl.tryAcquire();
        if (tryAcquire.isEmpty()) {
            this.actor.submit(this::tryWriteBatch);
        } else {
            writeBatch(tryAcquire.get());
        }
    }

    private void writeBatch(InFlightAppend inFlightAppend) {
        SequencedBatch tryRead = this.sequencer.tryRead();
        if (tryRead == null) {
            inFlightAppend.discard();
            return;
        }
        long firstPosition = tryRead.firstPosition();
        long firstPosition2 = (tryRead.firstPosition() + tryRead.entries().size()) - 1;
        inFlightAppend.start(firstPosition2);
        this.logStorage.append(firstPosition, firstPosition2, tryRead, new InstrumentedAppendListener(inFlightAppend, tryRead, this.metrics));
        this.actor.submit(this::tryWriteBatch);
    }

    private void onFailure(Throwable th) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.name, this.actor.getLifecyclePhase(), th});
        this.actor.fail(th);
        HealthReport withIssue = HealthReport.unhealthy(this).withIssue(th);
        this.failureListeners.forEach(failureListener -> {
            failureListener.onFailure(withIssue);
        });
    }

    @Override // io.camunda.zeebe.logstreams.impl.flowcontrol.AppendErrorHandler
    public void onCommitError(Throwable th) {
        this.actor.run(() -> {
            onFailure(th);
        });
    }

    @Override // io.camunda.zeebe.logstreams.impl.flowcontrol.AppendErrorHandler
    public void onWriteError(Throwable th) {
        this.actor.run(() -> {
            onFailure(th);
        });
    }
}
