/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.OffsetsRange;
import tech.ydb.topic.read.impl.ReadSession;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.TopicReadSettings;

public abstract class ReaderImpl {
    private static final Logger logger = LoggerFactory.getLogger(ReaderImpl.class);
    private static final int MAX_RECONNECT_COUNT = 0;
    private static final int EXP_BACKOFF_BASE_MS = 256;
    private static final int EXP_BACKOFF_CEILING_MS = 40000;
    private static final int EXP_BACKOFF_MAX_POWER = 7;
    private static final int DEFAULT_DECOMPRESSION_THREAD_COUNT = 4;
    private final String id;
    private final ReaderSettings settings;
    private final TopicRpc topicRpc;
    private final AtomicInteger reconnectCounter = new AtomicInteger(0);
    protected final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final AtomicLong sizeBytesAcquired = new AtomicLong(0L);
    private final AtomicLong sizeBytesToRequest = new AtomicLong(0L);
    private final Map<Long, tech.ydb.topic.read.impl.PartitionSession> partitionSessions = new ConcurrentHashMap<Long, tech.ydb.topic.read.impl.PartitionSession>();
    private final Executor decompressionExecutor;
    private final ExecutorService defaultDecompressionExecutorService;
    private CompletableFuture<Void> initResultFuture = new CompletableFuture();
    private ReadSession session;
    private String currentSessionId;
    private AtomicBoolean isReconnecting = new AtomicBoolean(false);

    public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
        this.id = UUID.randomUUID().toString();
        this.topicRpc = topicRpc;
        this.settings = settings;
        this.session = new ReadSession(topicRpc);
        if (settings.getDecompressionExecutor() != null) {
            this.defaultDecompressionExecutorService = null;
            this.decompressionExecutor = settings.getDecompressionExecutor();
        } else {
            this.defaultDecompressionExecutorService = Executors.newFixedThreadPool(4);
            this.decompressionExecutor = this.defaultDecompressionExecutorService;
        }
        StringBuilder message = new StringBuilder("Reader");
        if (settings.getReaderName() != null && !settings.getReaderName().isEmpty()) {
            message.append(" \"").append(settings.getReaderName()).append("\"");
        }
        message.append(" (generated id ").append(this.id).append(")");
        message.append(" created for topic(s): ");
        for (TopicReadSettings topic : settings.getTopics()) {
            if (topic != settings.getTopics().get(0)) {
                message.append(", ");
            }
            message.append("\"").append(topic.getPath()).append("\"");
        }
        message.append(" and Consumer: \"").append(settings.getConsumerName()).append("\"");
        logger.info(message.toString());
    }

    protected CompletableFuture<Void> initImpl() {
        logger.debug("[{}] initImpl started", (Object)this.id);
        this.session.start((GrpcReadStream.Observer<YdbTopic.StreamReadMessage.FromServer>)((GrpcReadStream.Observer)this::processMessage)).whenComplete(this::completeSession);
        this.initResultFuture = new CompletableFuture();
        YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest.newBuilder().setConsumer(this.settings.getConsumerName());
        this.settings.getTopics().forEach(topicReadSettings -> {
            YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder settingsBuilder = YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder().setPath(topicReadSettings.getPath());
            if (topicReadSettings.getPartitionIds() != null && !topicReadSettings.getPartitionIds().isEmpty()) {
                settingsBuilder.addAllPartitionIds(topicReadSettings.getPartitionIds());
            }
            if (topicReadSettings.getMaxLag() != null) {
                settingsBuilder.setMaxLag(ProtobufUtils.durationToProto((Duration)topicReadSettings.getMaxLag()));
            }
            if (topicReadSettings.getReadFrom() != null) {
                settingsBuilder.setReadFrom(ProtobufUtils.instantToProto((Instant)topicReadSettings.getReadFrom()));
            }
            initRequestBuilder.addTopicsReadSettings(settingsBuilder);
        });
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setInitRequest(initRequestBuilder).build());
        return this.initResultFuture;
    }

    private void sendReadRequest() {
        long currentSizeBytesToRequest = this.sizeBytesToRequest.getAndSet(0L);
        if (currentSizeBytesToRequest <= 0L) {
            logger.debug("[{}] Nothing to request in DataRequest. sizeBytesToRequest == {}", (Object)this.id, (Object)currentSizeBytesToRequest);
            return;
        }
        logger.debug("[{}] Sending DataRequest with {} bytes", (Object)this.id, (Object)currentSizeBytesToRequest);
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setReadRequest(YdbTopic.StreamReadMessage.ReadRequest.newBuilder().setBytesSize(currentSizeBytesToRequest).build()).build());
    }

    protected void sendStartPartitionSessionResponse(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, StartPartitionSessionSettings startSettings) {
        tech.ydb.topic.read.impl.PartitionSession partitionSession = tech.ydb.topic.read.impl.PartitionSession.newBuilder().setId(request.getPartitionSession().getPartitionSessionId()).setPath(request.getPartitionSession().getPath()).setPartitionId(request.getPartitionSession().getPartitionId()).setCommittedOffset(request.getCommittedOffset()).setPartitionOffsets(new OffsetsRange(request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd())).setDecompressionExecutor(this.decompressionExecutor).setDataEventCallback(this::handleDataReceivedEvent).setCommitFunction(this::commitOffset).build();
        this.partitionSessions.put(partitionSession.getId(), partitionSession);
        YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder responseBuilder = YdbTopic.StreamReadMessage.StartPartitionSessionResponse.newBuilder().setPartitionSessionId(partitionSession.getId());
        if (startSettings != null) {
            if (startSettings.getReadOffset() != null) {
                responseBuilder.setReadOffset(startSettings.getReadOffset().longValue());
            }
            if (startSettings.getCommitOffset() != null) {
                responseBuilder.setCommitOffset(startSettings.getCommitOffset().longValue());
            }
        }
        logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.id, partitionSession.getId(), partitionSession.getPartitionId()});
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStartPartitionSessionResponse(responseBuilder.build()).build());
    }

    protected void sendStopPartitionSessionResponse(long partitionSessionId) {
        tech.ydb.topic.read.impl.PartitionSession partitionSession = this.partitionSessions.get(partitionSessionId);
        if (partitionSession != null) {
            logger.info("[{}] User confirmed graceful shutdown of partition session {} (partition {})", new Object[]{this.id, partitionSessionId, partitionSession.getPartitionId()});
            partitionSession.shutdown();
            logger.info("[{}] Sending StopPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.id, partitionSessionId, partitionSession.getPartitionId()});
        } else {
            logger.warn("[{}] Sending StopPartitionSessionResponse for partition session {}, but have no such partition session running", (Object)this.id, (Object)partitionSessionId);
        }
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStopPartitionSessionResponse(YdbTopic.StreamReadMessage.StopPartitionSessionResponse.newBuilder().setPartitionSessionId(partitionSessionId).build()).build());
    }

    private void handleReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse) {
        long responseBytesSize = readResponse.getBytesSize();
        this.sizeBytesAcquired.addAndGet(responseBytesSize);
        logger.trace("Received ReadResponse of {} bytes", (Object)responseBytesSize);
        ArrayList batchReadFutures = new ArrayList();
        readResponse.getPartitionDataList().forEach(data -> {
            long partitionId = data.getPartitionSessionId();
            tech.ydb.topic.read.impl.PartitionSession partitionSession = this.partitionSessions.get(partitionId);
            if (partitionSession != null) {
                CompletableFuture<Void> readFuture = partitionSession.addBatches(data.getBatchesList());
                batchReadFutures.add(readFuture);
            } else {
                logger.debug("[{}] Received PartitionData for unknown(closed?) PartitionSessionId={}", (Object)this.id, (Object)partitionId);
            }
        });
        CompletableFuture.allOf(batchReadFutures.toArray(new CompletableFuture[0])).whenComplete((res, th) -> {
            if (th != null) {
                logger.error("[{}] Exception while waiting for batches to be read:", (Object)this.id, th);
            }
            boolean needToSendDataRequest = false;
            AtomicLong atomicLong = this.sizeBytesAcquired;
            synchronized (atomicLong) {
                long newAcquiredSize = this.sizeBytesAcquired.addAndGet(-responseBytesSize);
                if (this.isReconnecting.get()) {
                    logger.trace("[{}] Finished handling ReadResponse of {} bytes. Reconnect is in progress -- no need to send ReadRequest", (Object)this.id, (Object)responseBytesSize);
                } else {
                    logger.trace("[{}] Finished handling ReadResponse of {} bytes. sizeBytesAcquired is now {}. Sending ReadRequest...", new Object[]{this.id, responseBytesSize, newAcquiredSize});
                    this.sizeBytesToRequest.addAndGet(responseBytesSize);
                    needToSendDataRequest = true;
                }
            }
            if (needToSendDataRequest) {
                this.sendReadRequest();
            }
        });
    }

    protected void handleCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetResponse commitOffsetResponse) {
        for (YdbTopic.StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset partitionCommittedOffset : commitOffsetResponse.getPartitionsCommittedOffsetsList()) {
            tech.ydb.topic.read.impl.PartitionSession partitionSession = this.partitionSessions.get(partitionCommittedOffset.getPartitionSessionId());
            if (partitionSession != null) {
                partitionSession.handleCommitResponse(partitionCommittedOffset.getCommittedOffset());
                continue;
            }
            logger.debug("[{}] Received CommitOffsetResponse for closed partition session with id={}", (Object)this.id, (Object)partitionCommittedOffset.getPartitionSessionId());
        }
    }

    protected void handleStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request) {
        if (request.getGraceful()) {
            tech.ydb.topic.read.impl.PartitionSession partitionSession = this.partitionSessions.get(request.getPartitionSessionId());
            if (partitionSession != null) {
                logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.id, partitionSession.getId(), partitionSession.getPartitionId()});
            } else {
                logger.warn("[{}] Received graceful StopPartitionSessionRequest for partition session {}, but have no such partition session running", (Object)this.id, (Object)request.getPartitionSessionId());
            }
            this.handleStopPartitionSession(request);
        } else {
            tech.ydb.topic.read.impl.PartitionSession partitionSession = this.partitionSessions.remove(request.getPartitionSessionId());
            if (partitionSession != null) {
                logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.id, partitionSession.getId(), partitionSession.getPartitionId()});
                this.closePartitionSession(partitionSession);
            } else {
                logger.warn("[{}] Received force StopPartitionSessionRequest for partition session {}, but have no such partition session running", (Object)this.id, (Object)request.getPartitionSessionId());
            }
        }
    }

    protected abstract CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent var1);

    protected abstract void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest var1);

    protected abstract void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest var1);

    protected abstract void handleClosePartitionSession(PartitionSession var1);

    protected abstract void handleCloseReader();

    private void commitOffset(long partitionId, OffsetsRange offsets) {
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder().addCommitOffsets(YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder().setPartitionSessionId(partitionId).addOffsets(YdbTopic.OffsetsRange.newBuilder().setStart(offsets.getStart()).setEnd(offsets.getEnd())))).build());
    }

    private void reconnect() {
        logger.info("[{}] Reconnect #{} started. Creating new ReadSession", (Object)this.id, (Object)this.reconnectCounter.get());
        this.session = new ReadSession(this.topicRpc);
        this.initImpl();
    }

    protected CompletableFuture<Void> shutdownImpl() {
        logger.info("[{}] Shutting down Topic Reader", (Object)this.id);
        this.isStopped.set(true);
        return CompletableFuture.runAsync(() -> {
            this.closePartitionSessions();
            this.handleCloseReader();
            if (this.defaultDecompressionExecutorService != null) {
                this.defaultDecompressionExecutorService.shutdown();
            }
            this.session.shutdown();
        });
    }

    private void shutdownImpl(String reason) {
        if (!this.initResultFuture.isDone()) {
            this.initImpl().completeExceptionally(new RuntimeException(reason));
        }
        this.shutdownImpl();
    }

    private void closeSessionsAndScheduleReconnect(int currentReconnectCounter) {
        this.isReconnecting.set(true);
        this.closePartitionSessions();
        int delayMs = currentReconnectCounter <= 7 ? 256 * (1 << currentReconnectCounter) : 40000;
        delayMs += ThreadLocalRandom.current().nextInt(delayMs);
        logger.warn("[{}] Retry #{}. Scheduling reconnect in {}ms...", new Object[]{this.id, currentReconnectCounter, delayMs});
        this.topicRpc.getScheduler().schedule(this::reconnect, (long)delayMs, TimeUnit.MILLISECONDS);
    }

    private void closePartitionSessions() {
        this.partitionSessions.values().forEach(this::closePartitionSession);
        this.partitionSessions.clear();
    }

    private void closePartitionSession(tech.ydb.topic.read.impl.PartitionSession partitionSession) {
        partitionSession.shutdown();
        this.handleClosePartitionSession(partitionSession.getSessionInfo());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
        logger.trace("[{}] processMessage called", (Object)this.id);
        if (message.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
            logger.error("[{}] Got non-success status in processMessage method: {}", (Object)this.id, (Object)message);
            this.completeSession(Status.of((StatusCode)StatusCode.fromProto((StatusCodesProtos.StatusIds.StatusCode)message.getStatus())).withIssues(new Issue[]{Issue.of((String)("Got a message with non-success status: " + message), (Issue.Severity)Issue.Severity.ERROR)}), null);
            return;
        }
        this.reconnectCounter.set(0);
        if (message.hasInitResponse()) {
            this.currentSessionId = message.getInitResponse().getSessionId();
            this.initResultFuture.complete(null);
            AtomicLong atomicLong = this.sizeBytesAcquired;
            synchronized (atomicLong) {
                long bytesAvailable = this.settings.getMaxMemoryUsageBytes() - this.sizeBytesAcquired.get();
                this.sizeBytesToRequest.addAndGet(bytesAvailable);
                this.isReconnecting.set(false);
                logger.info("[{}] Session {} initialized. Requesting available {} bytes...", new Object[]{this.id, this.currentSessionId, bytesAvailable});
            }
            this.sendReadRequest();
        } else if (message.hasStartPartitionSessionRequest()) {
            YdbTopic.StreamReadMessage.StartPartitionSessionRequest request = message.getStartPartitionSessionRequest();
            logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {})", new Object[]{this.id, request.getPartitionSession().getPartitionSessionId(), request.getPartitionSession().getPartitionId()});
            this.handleStartPartitionSessionRequest(request);
        } else if (message.hasStopPartitionSessionRequest()) {
            this.handleStopPartitionSessionRequest(message.getStopPartitionSessionRequest());
        } else if (message.hasReadResponse()) {
            this.handleReadResponse(message.getReadResponse());
        } else if (message.hasCommitOffsetResponse()) {
            this.handleCommitOffsetResponse(message.getCommitOffsetResponse());
        } else if (message.hasPartitionSessionStatusResponse()) {
            YdbTopic.StreamReadMessage.PartitionSessionStatusResponse response = message.getPartitionSessionStatusResponse();
            tech.ydb.topic.read.impl.PartitionSession partitionSession = this.partitionSessions.get(response.getPartitionSessionId());
            logger.info("[{}] Received PartitionSessionStatusResponse: partition session {} (partition {}). Partition offsets: [{}, {}). Committed offset: {}", new Object[]{this.id, response.getPartitionSessionId(), partitionSession == null ? "unknown" : Long.valueOf(partitionSession.getPartitionId()), response.getPartitionOffsets().getStart(), response.getPartitionOffsets().getEnd(), response.getCommittedOffset()});
        } else if (message.hasUpdateTokenResponse()) {
            logger.debug("[{}] Received UpdateTokenResponse", (Object)this.id);
        } else {
            logger.error("[{}] Unhandled message from server: {}", (Object)this.id, (Object)message);
        }
    }

    private void completeSession(Status status, Throwable th) {
        logger.info("[{}] CompleteSession called", (Object)this.id);
        this.session.stop();
        if (th != null) {
            logger.error("[{}] Exception in reading stream session {}: ", new Object[]{this.id, this.currentSessionId, th});
        } else {
            if (status.isSuccess()) {
                if (this.isStopped.get()) {
                    logger.info("[{}] Reading stream session {} closed successfully", (Object)this.id, (Object)this.currentSessionId);
                } else {
                    logger.error("[{}] Reading stream session {} was closed unexpectedly. Shutting down the whole reader.", (Object)this.id, (Object)this.currentSessionId);
                    this.shutdownImpl("Reading stream session " + this.currentSessionId + " was closed unexpectedly. Shutting down reader.");
                }
                return;
            }
            logger.error("[{}] Error in reading stream session {}: {}", new Object[]{this.id, this.currentSessionId != null ? this.currentSessionId : "", status});
        }
        if (this.isStopped.get()) {
            return;
        }
        int currentReconnectCounter = this.reconnectCounter.incrementAndGet();
        this.closeSessionsAndScheduleReconnect(currentReconnectCounter);
    }
}

