/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Status;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.events.ReadEventHandler;
import tech.ydb.topic.read.events.ReaderClosedEvent;
import tech.ydb.topic.read.impl.OffsetsRangeImpl;
import tech.ydb.topic.read.impl.ReaderImpl;
import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl;
import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl;
import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl;
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;

public class AsyncReaderImpl
extends ReaderImpl
implements AsyncReader {
    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderImpl.class);
    private static final int DEFAULT_HANDLER_THREAD_COUNT = 4;
    private final Executor handlerExecutor;
    private final ExecutorService defaultHandlerExecutorService;
    private final ReadEventHandler eventHandler;

    public AsyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings, ReadEventHandlersSettings handlersSettings) {
        super(topicRpc, settings);
        this.eventHandler = handlersSettings.getEventHandler();
        if (handlersSettings.getExecutor() != null) {
            logger.debug("Using handler executor provided by user");
            this.defaultHandlerExecutorService = null;
            this.handlerExecutor = handlersSettings.getExecutor();
        } else {
            logger.debug("Using default handler executor");
            this.defaultHandlerExecutorService = Executors.newFixedThreadPool(4);
            this.handlerExecutor = this.defaultHandlerExecutorService;
        }
    }

    @Override
    public CompletableFuture<Void> init() {
        return this.initImpl();
    }

    @Override
    public CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction transaction, Map<String, List<PartitionOffsets>> offsets, UpdateOffsetsInTransactionSettings settings) {
        if (!transaction.isActive()) {
            throw new IllegalArgumentException("Transaction is not active. Can only read topic messages in already running transactions from other services");
        }
        return this.sendUpdateOffsetsInTransaction(transaction, offsets, settings);
    }

    @Override
    protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event) {
        return CompletableFuture.runAsync(() -> {
            try {
                this.eventHandler.onMessages(event);
            }
            catch (Exception exception) {
                String errorMessage = "Error in user DataReceivedEvent callback: " + exception;
                logger.error(errorMessage);
                this.shutdownImpl(errorMessage).join();
                throw exception;
            }
        }, this.handlerExecutor);
    }

    @Override
    protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) {
        this.handlerExecutor.execute(() -> {
            CommitOffsetAcknowledgementEventImpl event = new CommitOffsetAcknowledgementEventImpl(partitionSession, committedOffset);
            this.eventHandler.onCommitResponse(event);
        });
    }

    @Override
    protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, Consumer<StartPartitionSessionSettings> confirmCallback) {
        this.handlerExecutor.execute(() -> {
            YdbTopic.OffsetsRange offsetsRange = request.getPartitionOffsets();
            StartPartitionSessionEventImpl event = new StartPartitionSessionEventImpl(partitionSession, request.getCommittedOffset(), new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()), confirmCallback);
            this.eventHandler.onStartPartitionSession(event);
        });
    }

    @Override
    protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request, PartitionSession partitionSession, Runnable confirmCallback) {
        long committedOffset = request.getCommittedOffset();
        StopPartitionSessionEventImpl event = new StopPartitionSessionEventImpl(partitionSession, committedOffset, confirmCallback);
        this.handlerExecutor.execute(() -> this.eventHandler.onStopPartitionSession(event));
    }

    @Override
    protected void handleClosePartitionSession(PartitionSession partitionSession) {
        PartitionSessionClosedEventImpl event = new PartitionSessionClosedEventImpl(partitionSession);
        this.handlerExecutor.execute(() -> this.eventHandler.onPartitionSessionClosed(event));
    }

    protected void handleReaderClosed() {
        this.handlerExecutor.execute(() -> this.eventHandler.onReaderClosed(new ReaderClosedEvent()));
    }

    @Override
    protected void onShutdown(String reason) {
        super.onShutdown(reason);
        this.handleReaderClosed();
        if (this.defaultHandlerExecutorService != null) {
            logger.debug("Shutting down default handler executor");
            this.defaultHandlerExecutorService.shutdown();
        }
    }

    @Override
    public CompletableFuture<Void> shutdown() {
        return this.shutdownImpl();
    }
}

