/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.util;

import io.camunda.zeebe.logstreams.impl.flowcontrol.FlowControl;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBuilder;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.logstreams.util.TestLogStreamBuilder;
import io.camunda.zeebe.util.Either;
import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;
import org.awaitility.Awaitility;

public class TestLogStream
implements LogStream {
    private final LogStream logStream;
    private long lastWrittenPosition = -1L;

    public TestLogStream(LogStream logStream) {
        this.logStream = logStream;
    }

    public static TestLogStreamBuilder builder() {
        return new TestLogStreamBuilder();
    }

    public static TestLogStreamBuilder builder(LogStreamBuilder builder) {
        return new TestLogStreamBuilder(builder);
    }

    public long getLastWrittenPosition() {
        return this.lastWrittenPosition;
    }

    public void setLastWrittenPosition(long position) {
        this.lastWrittenPosition = position;
    }

    public BlockingLogStreamWriter newBlockingLogStreamWriter() {
        return new BlockingLogStreamWriter(this.newLogStreamWriter());
    }

    public void awaitPositionWritten(long position) {
        Awaitility.await((String)("until position " + position + " is written")).atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).pollInSameThread().until(this::getLastWrittenPosition, p -> p >= position);
    }

    public void close() {
        this.logStream.close();
    }

    public int getPartitionId() {
        return this.logStream.getPartitionId();
    }

    public String getLogName() {
        return this.logStream.getLogName();
    }

    public LogStreamReader newLogStreamReader() {
        return this.logStream.newLogStreamReader();
    }

    public LogStreamWriter newLogStreamWriter() {
        return this.logStream.newLogStreamWriter();
    }

    public FlowControl getFlowControl() {
        return this.logStream.getFlowControl();
    }

    public void registerRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.logStream.registerRecordAvailableListener(recordAwaiter);
    }

    public void removeRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.logStream.removeRecordAvailableListener(recordAwaiter);
    }

    private Either<LogStreamWriter.WriteFailure, Long> syncTryWrite(Supplier<Either<LogStreamWriter.WriteFailure, Long>> writeOperation) {
        Either written = (Either)Awaitility.await((String)"until dispatcher accepts writer").pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(50L)).pollInSameThread().until(writeOperation::get, Either::isRight);
        Long position = (Long)written.get();
        if (position > 0L) {
            this.awaitPositionWritten(position);
        }
        return written;
    }

    public final class BlockingLogStreamWriter
    implements LogStreamWriter {
        private final LogStreamWriter delegate;

        private BlockingLogStreamWriter(LogStreamWriter delegate) {
            this.delegate = delegate;
        }

        public Either<LogStreamWriter.WriteFailure, Long> tryWrite(WriteContext context, List<LogAppendEntry> appendEntries, long sourcePosition) {
            return TestLogStream.this.syncTryWrite(() -> this.delegate.tryWrite(context, appendEntries, sourcePosition));
        }
    }
}

