package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthReport;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStreamImpl.class */
public final class LogStreamImpl extends Actor implements LogStream, FailureListener, LogStorage.CommitListener {
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final String logName;
    private final int partitionId;
    private final int maxFragmentSize;
    private final LogStorage logStorage;
    private Sequencer sequencer;
    private final String actorName;
    private final Set<LogRecordAwaiter> recordAwaiters = new HashSet();
    private final Set<FailureListener> failureListeners = new HashSet();
    private HealthReport healthReport = HealthReport.healthy(this);
    private final CompletableActorFuture<Void> closeFuture = new CompletableActorFuture<>();
    private final List<LogStreamReader> readers = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogStreamImpl(String str, int i, int i2, LogStorage logStorage) {
        this.logName = str;
        this.partitionId = i;
        this.actorName = buildActorName("LogStream", i);
        this.maxFragmentSize = i2;
        this.logStorage = logStorage;
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarted() {
        this.logStorage.addCommitListener(this);
    }

    protected void onActorClosing() {
        LOG.info("On closing logstream {} close {} readers", this.logName, Integer.valueOf(this.readers.size()));
        this.readers.forEach((v0) -> {
            v0.close();
        });
        this.logStorage.removeCommitListener(this);
    }

    protected void onActorClosed() {
        this.closeFuture.complete((Object) null);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeAsync().join();
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        this.actor.close();
        return this.closeFuture;
    }

    protected void handleFailure(Throwable th) {
        onFailure(th);
    }

    private void onFailure(Throwable th) {
        LOG.error("Unexpected error in Log Stream {} in phase {}.", new Object[]{getName(), this.actor.getLifecyclePhase(), th});
        if (th instanceof UnrecoverableException) {
            onUnrecoverableFailure(HealthReport.dead(this).withIssue(th));
        } else {
            onFailure(HealthReport.unhealthy(this).withIssue(th));
        }
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public String getLogName() {
        return this.logName;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public ActorFuture<LogStreamReader> newLogStreamReader() {
        return this.actor.call(this::createLogStreamReader);
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public ActorFuture<LogStreamWriter> newLogStreamWriter() {
        if (this.actor.isClosed()) {
            return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
        }
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.run(() -> {
            try {
                if (this.sequencer == null) {
                    this.sequencer = createAndScheduleWriteBuffer();
                }
                completableActorFuture.complete(this.sequencer);
            } catch (Throwable th) {
                completableActorFuture.completeExceptionally(th);
            }
        });
        return completableActorFuture;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public void registerRecordAvailableListener(LogRecordAwaiter logRecordAwaiter) {
        this.actor.call(() -> {
            return Boolean.valueOf(this.recordAwaiters.add(logRecordAwaiter));
        });
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public void removeRecordAvailableListener(LogRecordAwaiter logRecordAwaiter) {
        this.actor.call(() -> {
            return Boolean.valueOf(this.recordAwaiters.remove(logRecordAwaiter));
        });
    }

    private void notifyRecordAwaiters() {
        this.recordAwaiters.forEach((v0) -> {
            v0.onRecordAvailable();
        });
    }

    @Override // io.camunda.zeebe.logstreams.storage.LogStorage.CommitListener
    public void onCommit() {
        this.actor.call(this::notifyRecordAwaiters);
    }

    private LogStreamReader createLogStreamReader() {
        LogStreamReaderImpl logStreamReaderImpl = new LogStreamReaderImpl(this.logStorage.newReader());
        this.readers.add(logStreamReaderImpl);
        return logStreamReaderImpl;
    }

    private long getWriteBuffersInitialPosition() {
        long lastCommittedPosition = getLastCommittedPosition();
        long j = 1;
        if (lastCommittedPosition > 0) {
            j = lastCommittedPosition + 1;
        }
        return j;
    }

    private Sequencer createAndScheduleWriteBuffer() {
        return new Sequencer(this.logStorage, getWriteBuffersInitialPosition(), this.maxFragmentSize, new SequencerMetrics(this.partitionId), new LogStreamMetrics(this.partitionId));
    }

    private long getLastCommittedPosition() {
        LogStorageReader newReader = this.logStorage.newReader();
        try {
            LogStreamReaderImpl logStreamReaderImpl = new LogStreamReaderImpl(newReader);
            try {
                long seekToEnd = logStreamReaderImpl.seekToEnd();
                logStreamReaderImpl.close();
                if (newReader != null) {
                    newReader.close();
                }
                return seekToEnd;
            } finally {
            }
        } catch (Throwable th) {
            if (newReader != null) {
                try {
                    newReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public HealthReport getHealthReport() {
        return this.healthReport;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.remove(failureListener);
        });
    }

    public void onFailure(HealthReport healthReport) {
        this.actor.run(() -> {
            this.healthReport = HealthReport.unhealthy(this).withIssue(healthReport);
            this.failureListeners.forEach(failureListener -> {
                failureListener.onFailure(this.healthReport);
            });
            closeAsync();
        });
    }

    public void onRecovered() {
        this.actor.run(() -> {
            this.healthReport = HealthReport.healthy(this);
            this.failureListeners.forEach((v0) -> {
                v0.onRecovered();
            });
        });
    }

    public void onUnrecoverableFailure(HealthReport healthReport) {
        this.actor.run(() -> {
            this.healthReport = HealthReport.dead(this).withIssue(healthReport);
            this.failureListeners.forEach(failureListener -> {
                failureListener.onUnrecoverableFailure(this.healthReport);
            });
            closeAsync();
        });
    }
}
