package io.camunda.zeebe.logstreams.util;

import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBuilder;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.util.Either;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;

/* loaded from: input_file:io/camunda/zeebe/logstreams/util/SyncLogStream.class */
public class SyncLogStream implements SynchronousLogStream {
    private final LogStream logStream;
    private long lastWrittenPosition = -1;

    /* loaded from: input_file:io/camunda/zeebe/logstreams/util/SyncLogStream$Writer.class */
    private final class Writer implements SynchronousLogStream.SynchronousLogStreamWriter {
        private final LogStreamWriter delegate;

        private Writer(LogStreamWriter logStreamWriter) {
            this.delegate = logStreamWriter;
        }

        public Either<LogStreamWriter.WriteFailure, Long> tryWrite(List<LogAppendEntry> list, long j) {
            return SyncLogStream.this.syncTryWrite(() -> {
                return this.delegate.tryWrite(list, j);
            });
        }
    }

    public SyncLogStream(LogStream logStream) {
        this.logStream = logStream;
    }

    public static SyncLogStreamBuilder builder() {
        return new SyncLogStreamBuilder();
    }

    public static SyncLogStreamBuilder builder(LogStreamBuilder logStreamBuilder) {
        return new SyncLogStreamBuilder(logStreamBuilder);
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public LogStream getAsyncLogStream() {
        return this.logStream;
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public int getPartitionId() {
        return this.logStream.getPartitionId();
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public String getLogName() {
        return this.logStream.getLogName();
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream, java.lang.AutoCloseable
    public void close() {
        this.logStream.closeAsync().join();
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public long getLastWrittenPosition() {
        return this.lastWrittenPosition;
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public void setLastWrittenPosition(long j) {
        this.lastWrittenPosition = j;
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public LogStreamReader newLogStreamReader() {
        return (LogStreamReader) this.logStream.newLogStreamReader().join();
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public LogStreamWriter newLogStreamWriter() {
        return (LogStreamWriter) this.logStream.newLogStreamWriter().join();
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public SynchronousLogStream.SynchronousLogStreamWriter newSyncLogStreamWriter() {
        return new Writer(newLogStreamWriter());
    }

    @Override // io.camunda.zeebe.logstreams.util.SynchronousLogStream
    public void awaitPositionWritten(long j) {
        Awaitility.await("until position " + j + " is written").atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).pollInSameThread().until(this::getLastWrittenPosition, l -> {
            return l.longValue() >= j;
        });
    }

    private Either<LogStreamWriter.WriteFailure, Long> syncTryWrite(Supplier<Either<LogStreamWriter.WriteFailure, Long>> supplier) {
        ConditionFactory pollInSameThread = Awaitility.await("until dispatcher accepts writer").pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).pollInSameThread();
        Objects.requireNonNull(supplier);
        Either<LogStreamWriter.WriteFailure, Long> either = (Either) pollInSameThread.until(supplier::get, (v0) -> {
            return v0.isRight();
        });
        Long l = (Long) either.get();
        if (l.longValue() > 0) {
            awaitPositionWritten(l.longValue());
        }
        return either;
    }
}
