/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.events.ReadEventHandler;
import tech.ydb.topic.read.events.ReaderClosedEvent;
import tech.ydb.topic.read.impl.OffsetsRange;
import tech.ydb.topic.read.impl.ReaderImpl;
import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl;
import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl;
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;

public class AsyncReaderImpl
extends ReaderImpl
implements AsyncReader {
    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderImpl.class);
    private static final int DEFAULT_HANDLER_THREAD_COUNT = 4;
    private final Executor handlerExecutor;
    private final ExecutorService defaultHandlerExecutorService;
    private final ReadEventHandler eventHandler;

    public AsyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings, ReadEventHandlersSettings handlersSettings) {
        super(topicRpc, settings);
        this.eventHandler = handlersSettings.getEventHandler();
        if (handlersSettings.getExecutor() != null) {
            logger.debug("Using handler executor provided by user");
            this.defaultHandlerExecutorService = null;
            this.handlerExecutor = handlersSettings.getExecutor();
        } else {
            logger.debug("Using default handler executor");
            this.defaultHandlerExecutorService = Executors.newFixedThreadPool(4);
            this.handlerExecutor = this.defaultHandlerExecutorService;
        }
    }

    @Override
    public CompletableFuture<Void> init() {
        return this.initImpl();
    }

    @Override
    protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event) {
        return CompletableFuture.runAsync(() -> this.eventHandler.onMessages(event), this.handlerExecutor);
    }

    @Override
    protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request) {
        this.handlerExecutor.execute(() -> {
            YdbTopic.StreamReadMessage.PartitionSession partitionSession = request.getPartitionSession();
            YdbTopic.OffsetsRange offsetsRange = request.getPartitionOffsets();
            StartPartitionSessionEventImpl event = new StartPartitionSessionEventImpl(new PartitionSession(partitionSession.getPartitionSessionId(), partitionSession.getPartitionId(), partitionSession.getPath()), request.getCommittedOffset(), new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()), startSettings -> this.sendStartPartitionSessionResponse(request, (StartPartitionSessionSettings)startSettings));
            this.eventHandler.onStartPartitionSession(event);
        });
    }

    @Override
    protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request) {
        long partitionSessionId = request.getPartitionSessionId();
        long committedOffset = request.getCommittedOffset();
        StopPartitionSessionEventImpl event = new StopPartitionSessionEventImpl(partitionSessionId, committedOffset, () -> this.sendStopPartitionSessionResponse(partitionSessionId));
        this.handlerExecutor.execute(() -> this.eventHandler.onStopPartitionSession(event));
    }

    @Override
    protected void handleClosePartitionSession(PartitionSession partitionSession) {
        PartitionSessionClosedEventImpl event = new PartitionSessionClosedEventImpl(partitionSession);
        this.handlerExecutor.execute(() -> this.eventHandler.onPartitionSessionClosed(event));
    }

    @Override
    protected void handleCloseReader() {
        this.handlerExecutor.execute(() -> this.eventHandler.onReaderClosed(new ReaderClosedEvent()));
    }

    @Override
    protected CompletableFuture<Void> shutdownImpl() {
        return super.shutdownImpl().whenComplete((res, th) -> {
            if (this.defaultHandlerExecutorService != null) {
                logger.debug("Shutting down default handler executor");
                this.defaultHandlerExecutorService.shutdown();
            }
        });
    }

    @Override
    public CompletableFuture<Void> shutdown() {
        return this.shutdownImpl();
    }
}

