package io.camunda.zeebe.logstreams.impl.log;

import com.netflix.concurrency.limits.Limit;
import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.flowcontrol.FlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.RateLimit;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import java.time.InstantSource;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStreamImpl.class */
public final class LogStreamImpl implements LogStream, LogStorage.CommitListener {
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final Collection<LogStreamReader> readers = new CopyOnWriteArrayList();
    private final Collection<LogRecordAwaiter> recordAwaiters = new CopyOnWriteArrayList();
    private final String logName;
    private final int partitionId;
    private final LogStorage logStorage;
    private final LogStreamMetrics logStreamMetrics;
    private final FlowControl flowControl;
    private final Sequencer sequencer;
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogStreamImpl(String str, int i, int i2, LogStorage logStorage, InstantSource instantSource, Limit limit, RateLimit rateLimit) {
        this.logName = str;
        this.partitionId = i;
        this.logStorage = logStorage;
        this.logStreamMetrics = new LogStreamMetrics(i);
        this.flowControl = new FlowControl(this.logStreamMetrics, limit, rateLimit);
        this.sequencer = new Sequencer(logStorage, getWriteBuffersInitialPosition(), i2, instantSource, new SequencerMetrics(i), this.flowControl);
        logStorage.addCommitListener(this);
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        LOG.info("Closing {} with {} readers", this.logName, Integer.valueOf(this.readers.size()));
        this.readers.forEach((v0) -> {
            v0.close();
        });
        this.logStorage.removeCommitListener(this);
        this.logStreamMetrics.remove();
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public String getLogName() {
        return this.logName;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public LogStreamReader newLogStreamReader() {
        ensureOpen();
        return createLogStreamReader();
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public LogStreamWriter newLogStreamWriter() {
        ensureOpen();
        return this.sequencer;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public FlowControl getFlowControl() {
        return this.flowControl;
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public void registerRecordAvailableListener(LogRecordAwaiter logRecordAwaiter) {
        ensureOpen();
        this.recordAwaiters.add(logRecordAwaiter);
    }

    @Override // io.camunda.zeebe.logstreams.log.LogStream
    public void removeRecordAvailableListener(LogRecordAwaiter logRecordAwaiter) {
        ensureOpen();
        this.recordAwaiters.remove(logRecordAwaiter);
    }

    @Override // io.camunda.zeebe.logstreams.storage.LogStorage.CommitListener
    public void onCommit() {
        if (this.closed) {
            return;
        }
        this.recordAwaiters.forEach((v0) -> {
            v0.onRecordAvailable();
        });
    }

    private void ensureOpen() {
        if (this.closed) {
            throw new IllegalStateException("%s is closed".formatted(this.logName));
        }
    }

    private LogStreamReader createLogStreamReader() {
        LogStreamReaderImpl logStreamReaderImpl = new LogStreamReaderImpl(this.logStorage.newReader());
        this.readers.add(logStreamReaderImpl);
        return logStreamReaderImpl;
    }

    private long getWriteBuffersInitialPosition() {
        long lastCommittedPosition = getLastCommittedPosition();
        return lastCommittedPosition > 0 ? lastCommittedPosition + 1 : 1L;
    }

    private long getLastCommittedPosition() {
        LogStorageReader newReader = this.logStorage.newReader();
        try {
            LogStreamReaderImpl logStreamReaderImpl = new LogStreamReaderImpl(newReader);
            try {
                long seekToEnd = logStreamReaderImpl.seekToEnd();
                logStreamReaderImpl.close();
                if (newReader != null) {
                    newReader.close();
                }
                return seekToEnd;
            } finally {
            }
        } catch (Throwable th) {
            if (newReader != null) {
                try {
                    newReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
