/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import com.netflix.concurrency.limits.Limit;
import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.flowcontrol.FlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.RateLimit;
import io.camunda.zeebe.logstreams.impl.log.LogStreamReaderImpl;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.impl.log.SequencerMetrics;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.util.CloseableSilently;
import java.time.InstantSource;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;

public final class LogStreamImpl
implements LogStream,
LogStorage.CommitListener {
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final Collection<LogStreamReader> readers = new CopyOnWriteArrayList<LogStreamReader>();
    private final Collection<LogRecordAwaiter> recordAwaiters = new CopyOnWriteArrayList<LogRecordAwaiter>();
    private final String logName;
    private final int partitionId;
    private final LogStorage logStorage;
    private final LogStreamMetrics logStreamMetrics;
    private final FlowControl flowControl;
    private final Sequencer sequencer;
    private volatile boolean closed;

    LogStreamImpl(String logName, int partitionId, int maxFragmentSize, LogStorage logStorage, InstantSource clock, Limit requestLimit, RateLimit writeRateLimit) {
        this.logName = logName;
        this.partitionId = partitionId;
        this.logStorage = logStorage;
        this.logStreamMetrics = new LogStreamMetrics(partitionId);
        this.flowControl = new FlowControl(this.logStreamMetrics, requestLimit, writeRateLimit);
        this.sequencer = new Sequencer(logStorage, this.getWriteBuffersInitialPosition(), maxFragmentSize, clock, new SequencerMetrics(partitionId), this.flowControl);
        logStorage.addCommitListener(this);
    }

    @Override
    public void close() {
        this.closed = true;
        LOG.debug("Closing {} with {} readers", (Object)this.logName, (Object)this.readers.size());
        this.readers.forEach(CloseableSilently::close);
        this.logStorage.removeCommitListener(this);
        this.logStreamMetrics.remove();
    }

    @Override
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override
    public String getLogName() {
        return this.logName;
    }

    @Override
    public LogStreamReader newLogStreamReader() {
        this.ensureOpen();
        return this.createLogStreamReader();
    }

    @Override
    public LogStreamWriter newLogStreamWriter() {
        this.ensureOpen();
        return this.sequencer;
    }

    @Override
    public FlowControl getFlowControl() {
        return this.flowControl;
    }

    @Override
    public void registerRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.ensureOpen();
        this.recordAwaiters.add(recordAwaiter);
    }

    @Override
    public void removeRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.ensureOpen();
        this.recordAwaiters.remove(recordAwaiter);
    }

    @Override
    public void onCommit() {
        if (this.closed) {
            return;
        }
        this.recordAwaiters.forEach(LogRecordAwaiter::onRecordAvailable);
    }

    private void ensureOpen() {
        if (this.closed) {
            throw new IllegalStateException("%s is closed".formatted(this.logName));
        }
    }

    private LogStreamReader createLogStreamReader() {
        LogStreamReaderImpl newReader = new LogStreamReaderImpl(this.logStorage.newReader());
        this.readers.add(newReader);
        return newReader;
    }

    private long getWriteBuffersInitialPosition() {
        long lastPosition = this.getLastCommittedPosition();
        long initialPosition = lastPosition > 0L ? lastPosition + 1L : 1L;
        return initialPosition;
    }

    private long getLastCommittedPosition() {
        try (LogStorageReader storageReader = this.logStorage.newReader();){
            LogStreamReaderImpl logStreamReader = new LogStreamReaderImpl(storageReader);
            try {
                long l = logStreamReader.seekToEnd();
                logStreamReader.close();
                return l;
            }
            catch (Throwable throwable) {
                try {
                    logStreamReader.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
        }
    }
}

