package io.zeebe.logstreams.impl;

import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.Dispatchers;
import io.zeebe.dispatcher.impl.PositionUtil;
import io.zeebe.logstreams.impl.log.index.LogBlockIndex;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamFailureListener;
import io.zeebe.logstreams.log.LogStreamUtil;
import io.zeebe.logstreams.snapshot.TimeBasedSnapshotPolicy;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.logstreams.spi.SnapshotPolicy;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.map.ZbMap;
import io.zeebe.util.EnsureUtil;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.buffer.BufferUtil;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicLongPosition;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.concurrent.status.Position;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogStreamImpl.class */
public final class LogStreamImpl implements LogStream {
    public static final String EXCEPTION_MSG_TRUNCATE_FAILED = "Truncation failed! Position %d was not found.";
    public static final String EXCEPTION_MSG_TRUNCATE_AND_LOG_STREAM_CTRL_IN_PARALLEL = "Can't truncate the log storage and have a log stream controller active at the same time.";
    public static final String EXCEPTION_MSG_TRUNCATE_COMMITTED_POSITION = "Can't truncate position which is already committed!";
    private static final int DEFAULT_INDEX_BLOCK_SIZE = 4194304;
    private static final int DEFAULT_READ_BLOCK_SIZE = 1024;
    protected volatile int term;
    protected final DirectBuffer topicName;
    protected final int partitionId;
    protected final String name;
    protected final LogStorage logStorage;
    protected final LogBlockIndex blockIndex;
    protected final ActorScheduler actorScheduler;
    protected final LogBlockIndexController logBlockIndexController;
    protected LogStreamController logStreamController;
    protected Dispatcher writeBuffer;
    protected final Position commitPosition;

    /* loaded from: input_file:io/zeebe/logstreams/impl/LogStreamImpl$LogStreamBuilder.class */
    public static class LogStreamBuilder<T extends LogStreamBuilder> {
        protected final DirectBuffer topicName;
        protected final int partitionId;
        protected final String logName;
        protected ActorScheduler actorScheduler;
        protected LogStorage logStorage;
        protected LogBlockIndex logBlockIndex;
        protected String logRootPath;
        protected String logDirectory;
        protected CountersManager countersManager;
        protected boolean logStreamControllerDisabled;
        protected boolean deleteOnClose;
        protected SnapshotPolicy snapshotPolicy;
        protected SnapshotStorage snapshotStorage;
        protected Dispatcher writeBuffer;
        protected int initialLogSegmentId = 0;
        protected int maxAppendBlockSize = 4194304;
        protected int writeBufferSize = LogStream.DEFAULT_WRITE_BUFFER_SIZE;
        protected int logSegmentSize = ZbMap.MAX_TABLE_SIZE;
        protected int indexBlockSize = 4194304;
        protected float deviation = 0.1f;
        protected int readBlockSize = 1024;

        public LogStreamBuilder(DirectBuffer directBuffer, int i) {
            this.topicName = directBuffer;
            this.partitionId = i;
            this.logName = String.format("%s.%d", BufferUtil.bufferAsString(directBuffer), Integer.valueOf(i));
        }

        protected T self() {
            return this;
        }

        public T logRootPath(String str) {
            this.logRootPath = str;
            return self();
        }

        public T logDirectory(String str) {
            this.logDirectory = str;
            return self();
        }

        public T writeBufferSize(int i) {
            this.writeBufferSize = i;
            return self();
        }

        public T maxAppendBlockSize(int i) {
            this.maxAppendBlockSize = i;
            return self();
        }

        public T initialLogSegmentId(int i) {
            this.initialLogSegmentId = i;
            return self();
        }

        public T logSegmentSize(int i) {
            this.logSegmentSize = i;
            return self();
        }

        public T deleteOnClose(boolean z) {
            this.deleteOnClose = z;
            return self();
        }

        public T actorScheduler(ActorScheduler actorScheduler) {
            this.actorScheduler = actorScheduler;
            return self();
        }

        public T countersManager(CountersManager countersManager) {
            this.countersManager = countersManager;
            return self();
        }

        public T indexBlockSize(int i) {
            this.indexBlockSize = i;
            return self();
        }

        public T deviation(float f) {
            this.deviation = f;
            return self();
        }

        public T logStorage(LogStorage logStorage) {
            this.logStorage = logStorage;
            return self();
        }

        public T logBlockIndex(LogBlockIndex logBlockIndex) {
            this.logBlockIndex = logBlockIndex;
            return self();
        }

        public T logStreamControllerDisabled(boolean z) {
            this.logStreamControllerDisabled = z;
            return self();
        }

        public T writeBuffer(Dispatcher dispatcher) {
            this.writeBuffer = dispatcher;
            return self();
        }

        public T snapshotStorage(SnapshotStorage snapshotStorage) {
            this.snapshotStorage = snapshotStorage;
            return self();
        }

        public T snapshotPolicy(SnapshotPolicy snapshotPolicy) {
            this.snapshotPolicy = snapshotPolicy;
            return self();
        }

        public T readBlockSize(int i) {
            this.readBlockSize = i;
            return self();
        }

        public DirectBuffer getTopicName() {
            return this.topicName;
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public String getLogName() {
            return this.logName;
        }

        public ActorScheduler getActorScheduler() {
            Objects.requireNonNull(this.actorScheduler, "No actor scheduler provided.");
            return this.actorScheduler;
        }

        protected void initLogStorage() {
        }

        public LogStorage getLogStorage() {
            if (this.logStorage == null) {
                initLogStorage();
            }
            return this.logStorage;
        }

        public LogBlockIndex getBlockIndex() {
            if (this.logBlockIndex == null) {
                this.logBlockIndex = new LogBlockIndex(100000, num -> {
                    return new UnsafeBuffer(ByteBuffer.allocate(num.intValue()));
                });
            }
            return this.logBlockIndex;
        }

        public int getMaxAppendBlockSize() {
            return this.maxAppendBlockSize;
        }

        public int getIndexBlockSize() {
            return this.indexBlockSize;
        }

        public int getReadBlockSize() {
            return this.readBlockSize;
        }

        public SnapshotPolicy getSnapshotPolicy() {
            if (this.snapshotPolicy == null) {
                this.snapshotPolicy = new TimeBasedSnapshotPolicy(Duration.ofMinutes(1L));
            }
            return this.snapshotPolicy;
        }

        protected Dispatcher initWriteBuffer(Dispatcher dispatcher, BufferedLogStreamReader bufferedLogStreamReader, String str, int i) {
            if (dispatcher == null) {
                long j = 0;
                bufferedLogStreamReader.seekToLastEvent();
                if (bufferedLogStreamReader.hasNext()) {
                    j = bufferedLogStreamReader.next().getPosition();
                }
                int i2 = 0;
                if (j > 0) {
                    i2 = PositionUtil.partitionId(j);
                }
                dispatcher = Dispatchers.create("log-write-buffer-" + str).bufferSize(i).subscriptions("log-appender").initialPartitionId(i2 + 1).conductorExternallyManaged().build();
            }
            return dispatcher;
        }

        public Dispatcher getWriteBuffer() {
            if (this.writeBuffer == null) {
                BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(getLogStorage(), getBlockIndex());
                this.writeBuffer = initWriteBuffer(this.writeBuffer, bufferedLogStreamReader, this.logName, this.writeBufferSize);
                bufferedLogStreamReader.close();
            }
            return this.writeBuffer;
        }

        public boolean isLogStreamControllerDisabled() {
            return this.logStreamControllerDisabled;
        }

        public void initSnapshotStorage() {
        }

        public SnapshotStorage getSnapshotStorage() {
            if (this.snapshotStorage == null) {
                initSnapshotStorage();
            }
            return this.snapshotStorage;
        }

        public float getDeviation() {
            return this.deviation;
        }

        public LogStream build() {
            Objects.requireNonNull(getTopicName(), "topicName");
            EnsureUtil.ensureGreaterThanOrEqual("partitionId", this.partitionId, 0L);
            Objects.requireNonNull(getLogStorage(), "logStorage");
            Objects.requireNonNull(getBlockIndex(), "blockIndex");
            Objects.requireNonNull(getActorScheduler(), "actorScheduler");
            EnsureUtil.ensureFalse("deviation", this.deviation <= 0.0f || this.deviation > 1.0f);
            return new LogStreamImpl(this);
        }
    }

    private LogStreamImpl(LogStreamBuilder logStreamBuilder) {
        this.term = 0;
        this.commitPosition = new AtomicLongPosition();
        DirectBuffer topicName = logStreamBuilder.getTopicName();
        if (topicName.capacity() > 128) {
            throw new RuntimeException(String.format("Topic name exceeds max length (%d > %d bytes)", Integer.valueOf(topicName.capacity()), 128));
        }
        this.topicName = BufferUtil.cloneBuffer(topicName);
        this.partitionId = logStreamBuilder.getPartitionId();
        this.name = logStreamBuilder.getLogName();
        this.logStorage = logStreamBuilder.getLogStorage();
        this.blockIndex = logStreamBuilder.getBlockIndex();
        this.actorScheduler = logStreamBuilder.getActorScheduler();
        this.commitPosition.setOrdered(-1L);
        this.logBlockIndexController = new LogBlockIndexController(logStreamBuilder, this.commitPosition);
        if (logStreamBuilder.isLogStreamControllerDisabled()) {
            return;
        }
        this.logStreamController = new LogStreamController(logStreamBuilder);
        this.writeBuffer = logStreamBuilder.getWriteBuffer();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public LogBlockIndexController getLogBlockIndexController() {
        return this.logBlockIndexController;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public LogStreamController getLogStreamController() {
        return this.logStreamController;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public DirectBuffer getTopicName() {
        return this.topicName;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public String getLogName() {
        return this.name;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void open() {
        try {
            openAsync().get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2.getCause() != null ? e2.getCause() : e2);
        }
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public CompletableFuture<Void> openAsync() {
        return this.logStreamController != null ? CompletableFuture.allOf(this.logBlockIndexController.openAsync(), openStreamControlling(this.actorScheduler, this.logStreamController.getMaxAppendBlockSize())) : this.logBlockIndexController.openAsync();
    }

    @Override // io.zeebe.logstreams.log.LogStream, java.lang.AutoCloseable
    public void close() {
        try {
            closeAsync().get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2.getCause() != null ? e2.getCause() : e2);
        }
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public CompletableFuture<Void> closeAsync() {
        return this.writeBuffer != null ? CompletableFuture.allOf(this.logBlockIndexController.closeAsync(), this.writeBuffer.closeAsync().thenApply(r3 -> {
            return this.logStreamController.closeAsync();
        }).thenAccept((Consumer<? super U>) completableFuture -> {
            this.logStorage.close();
        })) : this.logBlockIndexController.closeAsync().thenAccept(r32 -> {
            this.logStorage.close();
        });
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public long getCurrentAppenderPosition() {
        if (this.logStreamController == null) {
            return 0L;
        }
        return this.logStreamController.getCurrentAppenderPosition();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public long getCommitPosition() {
        return this.commitPosition.get();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void setCommitPosition(long j) {
        this.commitPosition.setOrdered(j);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public int getTerm() {
        return this.term;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void setTerm(int i) {
        this.term = i;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void registerFailureListener(LogStreamFailureListener logStreamFailureListener) {
        if (this.logStreamController != null) {
            this.logStreamController.registerFailureListener(logStreamFailureListener);
        }
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void removeFailureListener(LogStreamFailureListener logStreamFailureListener) {
        if (this.logStreamController != null) {
            this.logStreamController.removeFailureListener(logStreamFailureListener);
        }
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public LogStorage getLogStorage() {
        return this.logStorage;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public LogBlockIndex getLogBlockIndex() {
        return this.blockIndex;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public int getIndexBlockSize() {
        return this.logBlockIndexController.getIndexBlockSize();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public CompletableFuture<Void> closeLogStreamController() {
        return this.logStreamController != null ? this.writeBuffer.closeAsync().thenApply(r3 -> {
            return this.logStreamController.closeAsync();
        }).thenAccept((Consumer<? super U>) completableFuture -> {
            this.writeBuffer = null;
        }) : CompletableFuture.completedFuture(null);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public CompletableFuture<Void> openLogStreamController() {
        return openLogStreamController(this.actorScheduler, 4194304);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public CompletableFuture<Void> openLogStreamController(ActorScheduler actorScheduler) {
        return openLogStreamController(actorScheduler, 4194304);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public CompletableFuture<Void> openLogStreamController(ActorScheduler actorScheduler, int i) {
        return openStreamControlling(actorScheduler, i);
    }

    private CompletableFuture<Void> openStreamControlling(ActorScheduler actorScheduler, int i) {
        if ((this.writeBuffer != null && this.writeBuffer.isClosed()) || this.writeBuffer == null) {
            LogStreamBuilder createNewBuilder = createNewBuilder(actorScheduler, i);
            this.writeBuffer = createNewBuilder.getWriteBuffer();
            if (this.logStreamController == null) {
                this.logStreamController = new LogStreamController(createNewBuilder);
            } else {
                this.logStreamController.wrap(createNewBuilder);
            }
        }
        return this.logStreamController.openAsync();
    }

    private LogStreamBuilder createNewBuilder(ActorScheduler actorScheduler, int i) {
        if (!this.logStorage.isOpen()) {
            this.logStorage.open();
        }
        return new LogStreamBuilder(this.topicName, this.partitionId).logStorage(this.logStorage).logBlockIndex(this.blockIndex).actorScheduler(actorScheduler).maxAppendBlockSize(i);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public Dispatcher getWriteBuffer() {
        return this.writeBuffer;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void truncate(long j) {
        if (this.logStreamController != null && !this.logStreamController.isClosed()) {
            throw new IllegalStateException(EXCEPTION_MSG_TRUNCATE_AND_LOG_STREAM_CTRL_IN_PARALLEL);
        }
        if (j <= getCommitPosition()) {
            throw new IllegalArgumentException(EXCEPTION_MSG_TRUNCATE_COMMITTED_POSITION);
        }
        long addressForPosition = LogStreamUtil.getAddressForPosition(this, j);
        if (addressForPosition == -1) {
            throw new IllegalArgumentException(String.format(EXCEPTION_MSG_TRUNCATE_FAILED, Long.valueOf(j)));
        }
        this.logStorage.truncate(addressForPosition);
        this.logBlockIndexController.truncate();
    }

    public String toString() {
        return "LogStreamImpl{topicName=" + BufferUtil.bufferAsString(this.topicName) + ", partitionId=" + this.partitionId + ", term=" + this.term + ", name='" + this.name + "'}";
    }
}
