/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.impl.GrpcStreamRetrier;
import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.OffsetsRangeImpl;
import tech.ydb.topic.read.impl.PartitionSessionImpl;
import tech.ydb.topic.read.impl.ReadSession;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.TopicReadSettings;

public abstract class ReaderImpl
extends GrpcStreamRetrier {
    private static final Logger logger = LoggerFactory.getLogger(ReaderImpl.class);
    private static final int DEFAULT_DECOMPRESSION_THREAD_COUNT = 4;
    private ReadSessionImpl session;
    private final ReaderSettings settings;
    private final TopicRpc topicRpc;
    private final Executor decompressionExecutor;
    private final ExecutorService defaultDecompressionExecutorService;
    private final AtomicReference<CompletableFuture<Void>> initResultFutureRef = new AtomicReference<Object>(null);
    private final AtomicLong seqNumberCounter = new AtomicLong(0L);

    public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
        super(topicRpc.getScheduler());
        this.topicRpc = topicRpc;
        this.settings = settings;
        this.session = new ReadSessionImpl();
        if (settings.getDecompressionExecutor() != null) {
            this.defaultDecompressionExecutorService = null;
            this.decompressionExecutor = settings.getDecompressionExecutor();
        } else {
            this.defaultDecompressionExecutorService = Executors.newFixedThreadPool(4);
            this.decompressionExecutor = this.defaultDecompressionExecutorService;
        }
        StringBuilder message = new StringBuilder("Reader");
        if (settings.getReaderName() != null && !settings.getReaderName().isEmpty()) {
            message.append(" \"").append(settings.getReaderName()).append("\"");
        }
        message.append(" (generated id ").append(this.id).append(")");
        message.append(" created for topic(s): ");
        for (TopicReadSettings topic : settings.getTopics()) {
            if (topic != settings.getTopics().get(0)) {
                message.append(", ");
            }
            message.append("\"").append(topic.getPath()).append("\"");
        }
        message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
        logger.info(message.toString());
    }

    @Override
    protected Logger getLogger() {
        return logger;
    }

    @Override
    protected String getStreamName() {
        return "Reader";
    }

    protected CompletableFuture<Void> initImpl() {
        logger.info("[{}] initImpl called", (Object)this.id);
        if (this.initResultFutureRef.compareAndSet(null, new CompletableFuture())) {
            this.session.startAndInitialize();
        } else {
            logger.warn("[{}] Init is called on this reader more than once. Nothing is done", (Object)this.id);
        }
        return this.initResultFutureRef.get();
    }

    protected abstract CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent var1);

    protected abstract void handleCommitResponse(long var1, PartitionSession var3);

    protected abstract void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest var1, PartitionSession var2, Consumer<StartPartitionSessionSettings> var3);

    protected abstract void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest var1, @Nullable Long var2, Runnable var3);

    protected abstract void handleClosePartitionSession(PartitionSession var1);

    protected abstract void handleCloseReader();

    @Override
    protected void onStreamReconnect() {
        this.session = new ReadSessionImpl();
        this.session.startAndInitialize();
    }

    @Override
    protected void onShutdown(String reason) {
        this.session.shutdown();
        if (this.initResultFutureRef.get() != null && !this.initResultFutureRef.get().isDone()) {
            this.initResultFutureRef.get().completeExceptionally(new RuntimeException(reason));
        }
        if (this.defaultDecompressionExecutorService != null) {
            this.defaultDecompressionExecutorService.shutdown();
        }
    }

    private class ReadSessionImpl
    extends ReadSession {
        protected String sessionId;
        private final String fullId;
        private final AtomicLong sizeBytesToRequest;
        private final Map<Long, PartitionSessionImpl> partitionSessions;

        private ReadSessionImpl() {
            super(ReaderImpl.this.topicRpc);
            this.sessionId = "";
            this.sizeBytesToRequest = new AtomicLong(0L);
            this.partitionSessions = new ConcurrentHashMap<Long, PartitionSessionImpl>();
            this.fullId = ReaderImpl.this.id + '.' + ReaderImpl.this.seqNumberCounter.incrementAndGet();
        }

        @Override
        public void startAndInitialize() {
            logger.debug("[{}] Session {} startAndInitialize called", (Object)this.fullId, (Object)this.sessionId);
            this.start(this::processMessage).whenComplete(this::onSessionClosing);
            YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest.newBuilder().setConsumer(ReaderImpl.this.settings.getConsumerName());
            ReaderImpl.this.settings.getTopics().forEach(topicReadSettings -> {
                YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder settingsBuilder = YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder().setPath(topicReadSettings.getPath());
                if (topicReadSettings.getPartitionIds() != null && !topicReadSettings.getPartitionIds().isEmpty()) {
                    settingsBuilder.addAllPartitionIds(topicReadSettings.getPartitionIds());
                }
                if (topicReadSettings.getMaxLag() != null) {
                    settingsBuilder.setMaxLag(ProtobufUtils.durationToProto((Duration)topicReadSettings.getMaxLag()));
                }
                if (topicReadSettings.getReadFrom() != null) {
                    settingsBuilder.setReadFrom(ProtobufUtils.instantToProto((Instant)topicReadSettings.getReadFrom()));
                }
                initRequestBuilder.addTopicsReadSettings(settingsBuilder);
            });
            if (ReaderImpl.this.settings.getReaderName() != null && !ReaderImpl.this.settings.getReaderName().isEmpty()) {
                initRequestBuilder.setReaderName(ReaderImpl.this.settings.getReaderName());
            }
            this.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setInitRequest(initRequestBuilder).build());
        }

        private void sendReadRequest() {
            long currentSizeBytesToRequest = this.sizeBytesToRequest.getAndSet(0L);
            if (currentSizeBytesToRequest <= 0L) {
                logger.debug("[{}] Nothing to request in DataRequest. sizeBytesToRequest == {}", (Object)this.fullId, (Object)currentSizeBytesToRequest);
                return;
            }
            logger.debug("[{}] Sending DataRequest with {} bytes", (Object)this.fullId, (Object)currentSizeBytesToRequest);
            this.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setReadRequest(YdbTopic.StreamReadMessage.ReadRequest.newBuilder().setBytesSize(currentSizeBytesToRequest).build()).build());
        }

        private void sendStartPartitionSessionResponse(PartitionSession partitionSession, StartPartitionSessionSettings startSettings) {
            if (!this.isWorking.get()) {
                logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {}), but reading session is already closed", new Object[]{this.fullId, partitionSession.getId(), partitionSession.getPartitionId()});
                return;
            }
            if (!this.partitionSessions.containsKey(partitionSession.getId())) {
                logger.info("[{}] Need to send StartPartitionSessionResponse for partition session {} (partition {}), but have no such partition session active", new Object[]{this.fullId, partitionSession.getId(), partitionSession.getPartitionId()});
                return;
            }
            YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder responseBuilder = YdbTopic.StreamReadMessage.StartPartitionSessionResponse.newBuilder().setPartitionSessionId(partitionSession.getId());
            if (startSettings != null) {
                if (startSettings.getReadOffset() != null) {
                    responseBuilder.setReadOffset(startSettings.getReadOffset().longValue());
                }
                if (startSettings.getCommitOffset() != null) {
                    responseBuilder.setCommitOffset(startSettings.getCommitOffset().longValue());
                }
            }
            logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.fullId, partitionSession.getId(), partitionSession.getPartitionId()});
            this.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStartPartitionSessionResponse(responseBuilder.build()).build());
        }

        private void sendStopPartitionSessionResponse(long partitionSessionId) {
            if (!this.isWorking.get()) {
                logger.info("[{}] Need to send StopPartitionSessionResponse for partition session {}, but reading session is already closed", (Object)this.fullId, (Object)partitionSessionId);
                return;
            }
            PartitionSessionImpl partitionSession = this.partitionSessions.remove(partitionSessionId);
            if (partitionSession != null) {
                partitionSession.shutdown();
                logger.info("[{}] Sending StopPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.fullId, partitionSessionId, partitionSession.getPartitionId()});
            } else {
                logger.warn("[{}] Sending StopPartitionSessionResponse for partition session {}, but have no such partition session active", (Object)this.fullId, (Object)partitionSessionId);
            }
            this.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStopPartitionSessionResponse(YdbTopic.StreamReadMessage.StopPartitionSessionResponse.newBuilder().setPartitionSessionId(partitionSessionId).build()).build());
        }

        private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, List<OffsetsRange> rangesToCommit) {
            if (!this.isWorking.get()) {
                if (logger.isInfoEnabled()) {
                    StringBuilder message = new StringBuilder("[").append(this.fullId).append("] Need to send CommitRequest for partition session ").append(partitionSessionId).append(" (partition ").append(partitionId).append(") with offset ranges ");
                    for (int i = 0; i < rangesToCommit.size(); ++i) {
                        if (i > 0) {
                            message.append(", ");
                        }
                        OffsetsRange range2 = rangesToCommit.get(i);
                        message.append("[").append(range2.getStart()).append(",").append(range2.getEnd()).append(")");
                    }
                    message.append(", but reading session is already closed");
                    logger.info(message.toString());
                }
                return;
            }
            YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder builder = YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder().setPartitionSessionId(partitionSessionId);
            rangesToCommit.forEach(range -> builder.addOffsets(YdbTopic.OffsetsRange.newBuilder().setStart(range.getStart()).setEnd(range.getEnd())));
            this.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder().addCommitOffsets(builder)).build());
        }

        private void closePartitionSessions() {
            this.partitionSessions.values().forEach(this::closePartitionSession);
            this.partitionSessions.clear();
        }

        private void closePartitionSession(PartitionSessionImpl partitionSession) {
            partitionSession.shutdown();
            ReaderImpl.this.handleClosePartitionSession(partitionSession.getSessionInfo());
        }

        private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) {
            this.sessionId = response.getSessionId();
            if (ReaderImpl.this.initResultFutureRef.get() != null) {
                ((CompletableFuture)ReaderImpl.this.initResultFutureRef.get()).complete(null);
            }
            this.sizeBytesToRequest.set(ReaderImpl.this.settings.getMaxMemoryUsageBytes());
            logger.info("[{}] Session {} initialized. Requesting {} bytes...", new Object[]{this.fullId, this.sessionId, ReaderImpl.this.settings.getMaxMemoryUsageBytes()});
            this.sendReadRequest();
        }

        private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request) {
            long partitionSessionId = request.getPartitionSession().getPartitionSessionId();
            long partitionId = request.getPartitionSession().getPartitionId();
            logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {})", new Object[]{this.fullId, partitionSessionId, partitionId});
            PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder().setId(partitionSessionId).setPath(request.getPartitionSession().getPath()).setPartitionId(partitionId).setCommittedOffset(request.getCommittedOffset()).setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd())).setDecompressionExecutor(ReaderImpl.this.decompressionExecutor).setDataEventCallback(ReaderImpl.this::handleDataReceivedEvent).setCommitFunction(offsets -> this.sendCommitOffsetRequest(partitionSessionId, partitionId, (List<OffsetsRange>)offsets)).build();
            this.partitionSessions.put(partitionSession.getId(), partitionSession);
            ReaderImpl.this.handleStartPartitionSessionRequest(request, partitionSession.getSessionInfo(), settings -> this.sendStartPartitionSessionResponse(partitionSession.getSessionInfo(), (StartPartitionSessionSettings)settings));
        }

        protected void onStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request) {
            if (request.getGraceful()) {
                PartitionSessionImpl partitionSession = this.partitionSessions.get(request.getPartitionSessionId());
                if (partitionSession != null) {
                    logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.fullId, partitionSession.getId(), partitionSession.getPartitionId()});
                } else {
                    logger.warn("[{}] Received graceful StopPartitionSessionRequest for partition session {}, but have no such partition session active", (Object)this.fullId, (Object)request.getPartitionSessionId());
                }
                ReaderImpl.this.handleStopPartitionSession(request, partitionSession == null ? null : Long.valueOf(partitionSession.getPartitionId()), () -> this.sendStopPartitionSessionResponse(request.getPartitionSessionId()));
            } else {
                PartitionSessionImpl partitionSession = this.partitionSessions.remove(request.getPartitionSessionId());
                if (partitionSession != null) {
                    logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.fullId, partitionSession.getId(), partitionSession.getPartitionId()});
                    this.closePartitionSession(partitionSession);
                } else {
                    logger.warn("[{}] Received force StopPartitionSessionRequest for partition session {}, but have no such partition session running", (Object)this.fullId, (Object)request.getPartitionSessionId());
                }
            }
        }

        private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse) {
            long responseBytesSize = readResponse.getBytesSize();
            logger.trace("[{}] Received ReadResponse of {} bytes", (Object)this.fullId, (Object)responseBytesSize);
            ArrayList batchReadFutures = new ArrayList();
            readResponse.getPartitionDataList().forEach(data -> {
                long partitionId = data.getPartitionSessionId();
                PartitionSessionImpl partitionSession = this.partitionSessions.get(partitionId);
                if (partitionSession != null) {
                    CompletableFuture<Void> readFuture = partitionSession.addBatches(data.getBatchesList());
                    batchReadFutures.add(readFuture);
                } else {
                    logger.error("[{}] Received PartitionData for unknown(closed?) PartitionSessionId={}. This shouldn't happen", (Object)this.fullId, (Object)partitionId);
                }
            });
            CompletableFuture.allOf(batchReadFutures.toArray(new CompletableFuture[0])).whenComplete((res, th) -> {
                if (th != null) {
                    logger.error("[{}] Exception while waiting for batches to be read:", (Object)this.fullId, th);
                }
                if (this.isWorking.get()) {
                    logger.trace("[{}] Finished handling ReadResponse of {} bytes. Sending ReadRequest...", (Object)this.fullId, (Object)responseBytesSize);
                    this.sizeBytesToRequest.addAndGet(responseBytesSize);
                    this.sendReadRequest();
                } else {
                    logger.trace("[{}] Finished handling ReadResponse of {} bytes. Read session is already closed -- no need to send ReadRequest", (Object)this.fullId, (Object)responseBytesSize);
                }
            });
        }

        protected void onCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetResponse response) {
            logger.trace("[{}] Received CommitOffsetResponse", (Object)this.fullId);
            for (YdbTopic.StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset partitionCommittedOffset : response.getPartitionsCommittedOffsetsList()) {
                PartitionSessionImpl partitionSession = this.partitionSessions.get(partitionCommittedOffset.getPartitionSessionId());
                if (partitionSession != null) {
                    partitionSession.handleCommitResponse(partitionCommittedOffset.getCommittedOffset());
                    ReaderImpl.this.handleCommitResponse(partitionCommittedOffset.getCommittedOffset(), partitionSession.getSessionInfo());
                    continue;
                }
                logger.error("[{}] Received CommitOffsetResponse for unknown (closed?) partition session with id={}. This shouldn't happen", (Object)this.fullId, (Object)partitionCommittedOffset.getPartitionSessionId());
            }
        }

        protected void onPartitionSessionStatusResponse(YdbTopic.StreamReadMessage.PartitionSessionStatusResponse response) {
            PartitionSessionImpl partitionSession = this.partitionSessions.get(response.getPartitionSessionId());
            logger.info("[{}] Received PartitionSessionStatusResponse: partition session {} (partition {}). Partition offsets: [{}, {}). Committed offset: {}", new Object[]{this.fullId, response.getPartitionSessionId(), partitionSession == null ? "unknown" : Long.valueOf(partitionSession.getPartitionId()), response.getPartitionOffsets().getStart(), response.getPartitionOffsets().getEnd(), response.getCommittedOffset()});
        }

        private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
            if (!this.isWorking.get()) {
                logger.debug("[{}] processMessage called, but read session is already closed", (Object)this.fullId);
                return;
            }
            logger.debug("[{}] processMessage called", (Object)this.fullId);
            if (message.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
                logger.warn("[{}] Got non-success status in processMessage method: {}", (Object)this.fullId, (Object)message);
                ReaderImpl.this.onSessionClosed(Status.of((StatusCode)StatusCode.fromProto((StatusCodesProtos.StatusIds.StatusCode)message.getStatus())).withIssues(new Issue[]{Issue.of((String)("Got a message with non-success status: " + message), (Issue.Severity)Issue.Severity.ERROR)}), null);
                return;
            }
            ReaderImpl.this.reconnectCounter.set(0);
            if (message.hasInitResponse()) {
                this.onInitResponse(message.getInitResponse());
            } else if (message.hasStartPartitionSessionRequest()) {
                this.onStartPartitionSessionRequest(message.getStartPartitionSessionRequest());
            } else if (message.hasStopPartitionSessionRequest()) {
                this.onStopPartitionSessionRequest(message.getStopPartitionSessionRequest());
            } else if (message.hasReadResponse()) {
                this.onReadResponse(message.getReadResponse());
            } else if (message.hasCommitOffsetResponse()) {
                this.onCommitOffsetResponse(message.getCommitOffsetResponse());
            } else if (message.hasPartitionSessionStatusResponse()) {
                this.onPartitionSessionStatusResponse(message.getPartitionSessionStatusResponse());
            } else if (message.hasUpdateTokenResponse()) {
                logger.debug("[{}] Received UpdateTokenResponse", (Object)this.fullId);
            } else {
                logger.error("[{}] Unhandled message from server: {}", (Object)this.fullId, (Object)message);
            }
        }

        private void onSessionClosing(Status status, Throwable th) {
            logger.info("[{}] Session {} onSessionClosing called", (Object)this.fullId, (Object)this.sessionId);
            if (this.isWorking.get()) {
                this.shutdown();
                ReaderImpl.this.onSessionClosed(status, th);
            }
        }

        @Override
        protected void onStop() {
            logger.debug("[{}] Session {} onStop called", (Object)this.fullId, (Object)this.sessionId);
            this.closePartitionSessions();
        }
    }
}

