/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.ReaderImpl;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;

public class SyncReaderImpl
extends ReaderImpl
implements SyncReader {
    private static final Logger logger = LoggerFactory.getLogger(SyncReaderImpl.class);
    private static final int POLL_INTERVAL_SECONDS = 5;
    private final Queue<MessageBatchWrapper> batchesInQueue = new LinkedList<MessageBatchWrapper>();
    private int currentMessageIndex = 0;

    public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
        super(topicRpc, settings);
    }

    @Override
    public void init() {
        this.initImpl();
    }

    @Override
    public void initAndWait() {
        this.initImpl().join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nullable
    public Message receive(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.isStopped.get()) {
            throw new RuntimeException("Reader was stopped");
        }
        Queue<MessageBatchWrapper> queue = this.batchesInQueue;
        synchronized (queue) {
            if (this.batchesInQueue.isEmpty()) {
                Instant now;
                long millisToWait = TimeUnit.MILLISECONDS.convert(timeout, unit);
                Instant deadline = Instant.now().plusMillis(millisToWait);
                while (this.batchesInQueue.isEmpty() && !(now = Instant.now()).isAfter(deadline)) {
                    millisToWait = Math.max(1L, Duration.between(now, deadline).toMillis());
                    logger.trace("No messages in queue. Waiting for {} ms...", (Object)millisToWait);
                    this.batchesInQueue.wait(millisToWait);
                }
                if (this.batchesInQueue.isEmpty()) {
                    logger.trace("Still no messages in queue. Returning null");
                    return null;
                }
            }
            logger.trace("Taking a message with index {} from batch", (Object)this.currentMessageIndex);
            MessageBatchWrapper currentBatch = this.batchesInQueue.element();
            Message result = (Message)currentBatch.messages.get(this.currentMessageIndex);
            ++this.currentMessageIndex;
            if (this.currentMessageIndex >= currentBatch.messages.size()) {
                logger.debug("Batch is read. signalling core reader impl");
                this.batchesInQueue.remove();
                this.currentMessageIndex = 0;
                currentBatch.future.complete(null);
            }
            return result;
        }
    }

    @Override
    public Message receive() throws InterruptedException {
        Message result;
        while ((result = this.receive(5L, TimeUnit.SECONDS)) == null) {
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event) {
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        if (this.isStopped.get()) {
            resultFuture.completeExceptionally(new RuntimeException("Reader was stopped"));
            return resultFuture;
        }
        if (event.getMessages().isEmpty()) {
            resultFuture.completeExceptionally(new RuntimeException("Batch has no messages"));
            return resultFuture;
        }
        Queue<MessageBatchWrapper> queue = this.batchesInQueue;
        synchronized (queue) {
            logger.debug("Putting a message batch into queue and notifying in case receive method is waiting");
            this.batchesInQueue.add(new MessageBatchWrapper(event.getMessages(), resultFuture));
            this.batchesInQueue.notify();
        }
        return resultFuture;
    }

    @Override
    protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) {
        if (logger.isDebugEnabled()) {
            logger.debug("CommitResponse received for partition session {} (partition {}) with committedOffset {}", new Object[]{partitionSession.getId(), partitionSession.getPartitionId(), committedOffset});
        }
    }

    @Override
    protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, Consumer<StartPartitionSessionSettings> confirmCallback) {
        confirmCallback.accept(null);
    }

    @Override
    protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, @Nullable Long partitionId, Runnable confirmCallback) {
        confirmCallback.run();
    }

    @Override
    protected void handleClosePartitionSession(PartitionSession partitionSession) {
        logger.debug("ClosePartitionSession event received. Ignoring.");
    }

    @Override
    protected void handleCloseReader() {
        logger.debug("CloseReader event received. Ignoring.");
    }

    @Override
    protected CompletableFuture<Void> shutdownImpl() {
        return super.shutdownImpl();
    }

    @Override
    public void shutdown() {
        this.shutdownImpl().join();
    }

    private static class MessageBatchWrapper {
        private final List<Message> messages;
        private final CompletableFuture<Void> future;

        private MessageBatchWrapper(List<Message> messages, CompletableFuture<Void> future) {
            this.messages = messages;
            this.future = future;
        }
    }
}

