/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.log.LogStreamReaderImpl;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.impl.log.SequencerMetrics;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.CloseableSilently;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;

public final class LogStreamImpl
extends Actor
implements LogStream,
FailureListener,
LogStorage.CommitListener {
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final Set<LogRecordAwaiter> recordAwaiters = new HashSet<LogRecordAwaiter>();
    private final String logName;
    private final int partitionId;
    private final List<LogStreamReader> readers;
    private final int maxFragmentSize;
    private final LogStorage logStorage;
    private final CompletableActorFuture<Void> closeFuture;
    private final Set<FailureListener> failureListeners = new HashSet<FailureListener>();
    private Sequencer sequencer;
    private final String actorName;
    private HealthReport healthReport = HealthReport.healthy((HealthMonitorable)this);

    LogStreamImpl(String logName, int partitionId, int maxFragmentSize, LogStorage logStorage) {
        this.logName = logName;
        this.partitionId = partitionId;
        this.actorName = LogStreamImpl.buildActorName((String)"LogStream", (int)partitionId);
        this.maxFragmentSize = maxFragmentSize;
        this.logStorage = logStorage;
        this.closeFuture = new CompletableActorFuture();
        this.readers = new ArrayList<LogStreamReader>();
    }

    protected Map<String, String> createContext() {
        Map context = super.createContext();
        context.put("partitionId", Integer.toString(this.partitionId));
        return context;
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarted() {
        this.logStorage.addCommitListener(this);
    }

    protected void onActorClosing() {
        LOG.info("On closing logstream {} close {} readers", (Object)this.logName, (Object)this.readers.size());
        this.readers.forEach(CloseableSilently::close);
        this.logStorage.removeCommitListener(this);
    }

    protected void onActorClosed() {
        this.closeFuture.complete(null);
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        this.actor.close();
        return this.closeFuture;
    }

    protected void handleFailure(Throwable failure) {
        this.onFailure(failure);
    }

    private void onFailure(Throwable failure) {
        LOG.error("Unexpected error in Log Stream {} in phase {}.", new Object[]{this.getName(), this.actor.getLifecyclePhase(), failure});
        if (failure instanceof UnrecoverableException) {
            this.onUnrecoverableFailure(HealthReport.dead((HealthMonitorable)this).withIssue(failure));
        } else {
            this.onFailure(HealthReport.unhealthy((HealthMonitorable)this).withIssue(failure));
        }
    }

    @Override
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override
    public String getLogName() {
        return this.logName;
    }

    @Override
    public ActorFuture<LogStreamReader> newLogStreamReader() {
        return this.actor.call(this::createLogStreamReader);
    }

    @Override
    public ActorFuture<LogStreamWriter> newLogStreamWriter() {
        if (this.actor.isClosed()) {
            return CompletableActorFuture.completedExceptionally((Throwable)new RuntimeException("Actor is closed"));
        }
        CompletableActorFuture result = new CompletableActorFuture();
        this.actor.run(() -> {
            try {
                if (this.sequencer == null) {
                    this.sequencer = this.createAndScheduleWriteBuffer();
                }
                result.complete((Object)this.sequencer);
            }
            catch (Throwable e) {
                result.completeExceptionally(e);
            }
        });
        return result;
    }

    @Override
    public void registerRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.actor.call(() -> this.recordAwaiters.add(recordAwaiter));
    }

    @Override
    public void removeRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.actor.call(() -> this.recordAwaiters.remove(recordAwaiter));
    }

    private void notifyRecordAwaiters() {
        this.recordAwaiters.forEach(LogRecordAwaiter::onRecordAvailable);
    }

    @Override
    public void onCommit() {
        this.actor.call(this::notifyRecordAwaiters);
    }

    private LogStreamReader createLogStreamReader() {
        LogStreamReaderImpl newReader = new LogStreamReaderImpl(this.logStorage.newReader());
        this.readers.add(newReader);
        return newReader;
    }

    private long getWriteBuffersInitialPosition() {
        long lastPosition = this.getLastCommittedPosition();
        long initialPosition = 1L;
        if (lastPosition > 0L) {
            initialPosition = lastPosition + 1L;
        }
        return initialPosition;
    }

    private Sequencer createAndScheduleWriteBuffer() {
        return new Sequencer(this.logStorage, this.getWriteBuffersInitialPosition(), this.maxFragmentSize, new SequencerMetrics(this.partitionId), new LogStreamMetrics(this.partitionId));
    }

    private long getLastCommittedPosition() {
        try (LogStorageReader storageReader = this.logStorage.newReader();){
            long l;
            try (LogStreamReaderImpl logReader = new LogStreamReaderImpl(storageReader);){
                l = logReader.seekToEnd();
            }
            return l;
        }
    }

    public HealthReport getHealthReport() {
        return this.healthReport;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.add(failureListener));
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    public void onFailure(HealthReport report) {
        this.actor.run(() -> {
            this.healthReport = HealthReport.unhealthy((HealthMonitorable)this).withIssue(report);
            this.failureListeners.forEach(l -> l.onFailure(this.healthReport));
            this.closeAsync();
        });
    }

    public void onRecovered() {
        this.actor.run(() -> {
            this.healthReport = HealthReport.healthy((HealthMonitorable)this);
            this.failureListeners.forEach(FailureListener::onRecovered);
        });
    }

    public void onUnrecoverableFailure(HealthReport report) {
        this.actor.run(() -> {
            this.healthReport = HealthReport.dead((HealthMonitorable)this).withIssue(report);
            this.failureListeners.forEach(l -> l.onUnrecoverableFailure(this.healthReport));
            this.closeAsync();
        });
    }
}

