/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.util;

import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBuilder;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.util.SyncLogStreamBuilder;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.util.Either;
import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;
import org.awaitility.Awaitility;

public class SyncLogStream
implements SynchronousLogStream {
    private final LogStream logStream;
    private long lastWrittenPosition = -1L;

    public SyncLogStream(LogStream logStream) {
        this.logStream = logStream;
    }

    public static SyncLogStreamBuilder builder() {
        return new SyncLogStreamBuilder();
    }

    public static SyncLogStreamBuilder builder(LogStreamBuilder builder) {
        return new SyncLogStreamBuilder(builder);
    }

    @Override
    public LogStream getAsyncLogStream() {
        return this.logStream;
    }

    @Override
    public int getPartitionId() {
        return this.logStream.getPartitionId();
    }

    @Override
    public String getLogName() {
        return this.logStream.getLogName();
    }

    @Override
    public void close() {
        this.logStream.closeAsync().join();
    }

    @Override
    public long getLastWrittenPosition() {
        return this.lastWrittenPosition;
    }

    @Override
    public void setLastWrittenPosition(long position) {
        this.lastWrittenPosition = position;
    }

    @Override
    public LogStreamReader newLogStreamReader() {
        return (LogStreamReader)this.logStream.newLogStreamReader().join();
    }

    @Override
    public LogStreamWriter newLogStreamWriter() {
        return (LogStreamWriter)this.logStream.newLogStreamWriter().join();
    }

    @Override
    public SynchronousLogStream.SynchronousLogStreamWriter newSyncLogStreamWriter() {
        return new Writer(this.newLogStreamWriter());
    }

    @Override
    public void awaitPositionWritten(long position) {
        Awaitility.await((String)("until position " + position + " is written")).atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).pollInSameThread().until(this::getLastWrittenPosition, p -> p >= position);
    }

    private Either<LogStreamWriter.WriteFailure, Long> syncTryWrite(Supplier<Either<LogStreamWriter.WriteFailure, Long>> writeOperation) {
        Either written = (Either)Awaitility.await((String)"until dispatcher accepts writer").pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).pollInSameThread().until(writeOperation::get, Either::isRight);
        Long position = (Long)written.get();
        if (position > 0L) {
            this.awaitPositionWritten(position);
        }
        return written;
    }

    private final class Writer
    implements SynchronousLogStream.SynchronousLogStreamWriter {
        private final LogStreamWriter delegate;

        private Writer(LogStreamWriter delegate) {
            this.delegate = delegate;
        }

        public Either<LogStreamWriter.WriteFailure, Long> tryWrite(List<LogAppendEntry> appendEntries, long sourcePosition) {
            return SyncLogStream.this.syncTryWrite(() -> this.delegate.tryWrite(appendEntries, sourcePosition));
        }
    }
}

