/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.write.impl;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.Encoder;
import tech.ydb.topic.write.InitResult;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.QueueOverflowException;
import tech.ydb.topic.write.WriteAck;
import tech.ydb.topic.write.impl.EnqueuedMessage;
import tech.ydb.topic.write.impl.MessageSender;
import tech.ydb.topic.write.impl.WriteSession;

public abstract class WriterImpl {
    private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);
    private static final int MAX_RECONNECT_COUNT = 0;
    private static final int EXP_BACKOFF_BASE_MS = 256;
    private static final int EXP_BACKOFF_CEILING_MS = 40000;
    private static final int EXP_BACKOFF_MAX_POWER = 7;
    private final WriterSettings settings;
    private final TopicRpc topicRpc;
    private CompletableFuture<InitResult> initResultFuture = new CompletableFuture();
    private final Queue<IncomingMessage> incomingQueue = new LinkedList<IncomingMessage>();
    private final Queue<EnqueuedMessage> encodingMessages = new LinkedList<EnqueuedMessage>();
    private final Queue<EnqueuedMessage> sendingQueue = new ConcurrentLinkedQueue<EnqueuedMessage>();
    private final Queue<EnqueuedMessage> sentMessages = new ConcurrentLinkedQueue<EnqueuedMessage>();
    private final AtomicBoolean writeRequestInProgress = new AtomicBoolean(false);
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private final AtomicInteger reconnectCounter = new AtomicInteger(0);
    private final Executor compressionExecutor;
    private final MessageSender messageSender;
    private WriteSession session;
    private String currentSessionId;
    private Boolean isSeqNoProvided = null;
    private int currentInFlightCount = 0;
    private long availableSizeBytes;
    private CompletableFuture<WriteAck> lastAcceptedMessageFuture;

    public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
        this.topicRpc = topicRpc;
        this.settings = settings;
        this.session = new WriteSession(topicRpc);
        this.availableSizeBytes = settings.getMaxSendBufferMemorySize();
        this.compressionExecutor = compressionExecutor;
        this.messageSender = new MessageSender(this.session, settings);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean instant) {
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            if (this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                if (instant) {
                    CompletableFuture<Void> result = new CompletableFuture<Void>();
                    result.completeExceptionally(new QueueOverflowException("Message queue in-flight limit reached"));
                    return result;
                }
            } else if (this.availableSizeBytes < (long)message.getMessage().getData().length) {
                if (instant) {
                    CompletableFuture<Void> result = new CompletableFuture<Void>();
                    result.completeExceptionally(new QueueOverflowException("Message queue size limit reached"));
                    return result;
                }
            } else if (this.incomingQueue.isEmpty()) {
                logger.trace("Putting a message into the queue right now, enough space in send buffer");
                this.acceptMessageIntoSendingQueue(message);
                return CompletableFuture.completedFuture(null);
            }
            logger.debug("Message queue send buffer is overflown. Putting the message into incoming waiting queue");
            IncomingMessage incomingMessage = new IncomingMessage(message);
            this.incomingQueue.add(incomingMessage);
            return incomingMessage.future;
        }
    }

    private void acceptMessageIntoSendingQueue(EnqueuedMessage message) {
        this.lastAcceptedMessageFuture = message.getFuture();
        ++this.currentInFlightCount;
        this.availableSizeBytes -= message.getUncompressedSizeBytes();
        if (logger.isTraceEnabled()) {
            logger.trace("Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, AvailableSizeBytes: {}", new Object[]{message.getUncompressedSizeBytes(), this.currentInFlightCount, this.availableSizeBytes});
        }
        this.encodingMessages.add(message);
        ((CompletableFuture)CompletableFuture.runAsync(() -> this.encode(message), this.compressionExecutor).thenRunAsync(() -> {
            boolean haveNewMessagesToSend = false;
            Queue<IncomingMessage> queue = this.incomingQueue;
            synchronized (queue) {
                EnqueuedMessage encodedMessage;
                while ((encodedMessage = this.encodingMessages.peek()) != null && (encodedMessage.isCompressed() || this.settings.getCodec() == Codec.RAW)) {
                    this.encodingMessages.remove();
                    if (encodedMessage.isCompressed()) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Message compressed from {} to {} bytes", (Object)encodedMessage.getUncompressedSizeBytes(), (Object)encodedMessage.getCompressedSizeBytes());
                        }
                        long bytesFreed = encodedMessage.getUncompressedSizeBytes() - encodedMessage.getCompressedSizeBytes();
                        this.free(0, bytesFreed);
                    }
                    logger.debug("Adding message to sending queue");
                    this.sendingQueue.add(encodedMessage);
                    haveNewMessagesToSend = true;
                }
            }
            if (haveNewMessagesToSend) {
                this.sendDataRequestIfNeeded();
            }
        })).exceptionally(throwable -> {
            logger.error("Exception while encoding message: ", throwable);
            this.free(1, message.getSizeBytes());
            message.getFuture().completeExceptionally((Throwable)throwable);
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendDataRequestIfNeeded() {
        while (true) {
            if (this.isReconnecting.get()) {
                logger.debug("Can't send data -- reconnect in progress");
                return;
            }
            if (!this.initResultFuture.isDone()) {
                logger.debug("Can't send data -- init was not yet received");
                return;
            }
            if (this.sendingQueue.isEmpty()) {
                logger.trace("Nothing to send -- sendingQueue is empty");
                return;
            }
            if (!this.writeRequestInProgress.compareAndSet(false, true)) {
                logger.debug("Send request is already in progress");
                return;
            }
            LinkedList<EnqueuedMessage> messages = new LinkedList<EnqueuedMessage>(this.sendingQueue);
            if (messages.isEmpty()) {
                logger.debug("Nothing to send -- sendingQueue is empty #2");
            } else {
                this.sendingQueue.removeAll(messages);
                this.sentMessages.addAll(messages);
                MessageSender messageSender = this.messageSender;
                synchronized (messageSender) {
                    this.messageSender.sendMessages(messages);
                }
            }
            if (this.writeRequestInProgress.compareAndSet(true, false)) continue;
            logger.error("Couldn't turn off writeRequestInProgress. Should not happen");
        }
    }

    private void encode(EnqueuedMessage message) {
        logger.trace("Started encoding message");
        if (this.settings.getCodec() == Codec.RAW) {
            return;
        }
        message.getMessage().setData(Encoder.encode(this.settings.getCodec(), message.getMessage().getData()));
        message.setCompressedSizeBytes(message.getMessage().getData().length);
        message.setCompressed(true);
        logger.trace("Successfully finished encoding message");
    }

    protected CompletableFuture<InitResult> initImpl() {
        logger.debug("initImpl started");
        this.session.start((GrpcReadStream.Observer<YdbTopic.StreamWriteMessage.FromServer>)((GrpcReadStream.Observer)this::processMessage)).whenComplete(this::completeSession);
        this.initResultFuture = new CompletableFuture();
        YdbTopic.StreamWriteMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamWriteMessage.InitRequest.newBuilder().setPath(this.settings.getTopicPath());
        String producerId = this.settings.getProducerId();
        if (producerId != null) {
            initRequestBuilder.setProducerId(producerId);
        }
        String messageGroupId = this.settings.getMessageGroupId();
        Long partitionId = this.settings.getPartitionId();
        if (messageGroupId != null) {
            if (partitionId != null) {
                throw new RuntimeException("Both MessageGroupId and PartitionId are set in WriterSettings");
            }
            initRequestBuilder.setMessageGroupId(messageGroupId);
        } else if (partitionId != null) {
            initRequestBuilder.setPartitionId(partitionId.longValue());
        }
        this.session.send(YdbTopic.StreamWriteMessage.FromClient.newBuilder().setInitRequest(initRequestBuilder).build());
        return this.initResultFuture;
    }

    protected CompletableFuture<CompletableFuture<WriteAck>> sendImpl(Message message, boolean instant) {
        if (this.isStopped.get()) {
            throw new RuntimeException("Writer is already stopped");
        }
        if (this.isSeqNoProvided != null) {
            if (message.getSeqNo() != null && !this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was provided for a message after it had bot been provided for a another message. SeqNo should either be provided for all messages or none of them.");
            }
            if (message.getSeqNo() == null && this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was not provided for a message after it had been provided for a another message. SeqNo should either be provided for all messages or none of them.");
            }
        } else {
            this.isSeqNoProvided = message.getSeqNo() != null;
        }
        EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message);
        return this.tryToEnqueue(enqueuedMessage, instant).thenApply(v -> enqueuedMessage.getFuture());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> flushImpl() {
        if (this.lastAcceptedMessageFuture == null) {
            return CompletableFuture.completedFuture(null);
        }
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            return this.lastAcceptedMessageFuture.isDone() ? CompletableFuture.completedFuture(null) : this.lastAcceptedMessageFuture.thenApply(v -> null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void free(int messageCount, long sizeBytes) {
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            this.currentInFlightCount -= messageCount;
            this.availableSizeBytes += sizeBytes;
            if (logger.isTraceEnabled()) {
                logger.trace("Freed {} bytes in {} messages. Current In-flight: {}, current availableSize: {}", new Object[]{sizeBytes, messageCount, this.currentInFlightCount, this.availableSizeBytes});
            }
            if (sizeBytes > 0L && !this.incomingQueue.isEmpty()) {
                IncomingMessage incomingMessage;
                while ((incomingMessage = this.incomingQueue.peek()) != null) {
                    if (incomingMessage.message.getUncompressedSizeBytes() > this.availableSizeBytes || this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                        logger.trace("There are messages in incomingQueue still, but no space in send buffer");
                        return;
                    }
                    logger.trace("Putting a message into send buffer after freeing some space");
                    if (incomingMessage.future.complete(null)) {
                        this.acceptMessageIntoSendingQueue(incomingMessage.message);
                    }
                    this.incomingQueue.remove();
                }
                logger.trace("All messages from incomingQueue are accepted into send buffer");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reconnect() {
        logger.info("Reconnect #{} started. Creating new WriteSession", (Object)this.reconnectCounter.get());
        this.session = new WriteSession(this.topicRpc);
        MessageSender messageSender = this.messageSender;
        synchronized (messageSender) {
            this.messageSender.setSession(this.session);
        }
        this.initImpl();
        if (!this.isReconnecting.compareAndSet(true, false)) {
            logger.warn("Couldn't reset reconnect flag. Shouldn't happen");
        }
    }

    protected CompletableFuture<Void> shutdownImpl() {
        logger.info("Shutting down Topic Writer");
        this.isStopped.set(true);
        return this.flushImpl().thenRun(() -> this.session.shutdown());
    }

    private void shutdownImpl(String reason) {
        if (!this.initResultFuture.isDone()) {
            this.initImpl().completeExceptionally(new RuntimeException(reason));
        }
        this.shutdownImpl();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
        logger.debug("processMessage called");
        if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
            this.reconnectCounter.set(0);
        } else {
            logger.error("Got non-success status in processMessage method: {}", (Object)message);
            this.completeSession(Status.of((StatusCode)StatusCode.fromProto((StatusCodesProtos.StatusIds.StatusCode)message.getStatus())).withIssues(new Issue[]{Issue.of((String)("Got a message with non-success status: " + message), (Issue.Severity)Issue.Severity.ERROR)}), null);
        }
        if (message.hasInitResponse()) {
            this.currentSessionId = message.getInitResponse().getSessionId();
            logger.info("Session {} initialized", (Object)this.currentSessionId);
            long lastSeqNo = message.getInitResponse().getLastSeqNo();
            MessageSender messageSender = this.messageSender;
            synchronized (messageSender) {
                this.messageSender.setSeqNo(lastSeqNo);
                if (!this.sentMessages.isEmpty()) {
                    this.messageSender.sendMessages(this.sentMessages);
                }
            }
            this.initResultFuture.complete(new InitResult(lastSeqNo));
            this.sendDataRequestIfNeeded();
        } else if (message.hasWriteResponse()) {
            List acks = message.getWriteResponse().getAcksList();
            int inFlightFreed = 0;
            long bytesFreed = 0L;
            block3: for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
                EnqueuedMessage sentMessage;
                while ((sentMessage = this.sentMessages.peek()) != null) {
                    if (sentMessage.getSeqNo().longValue() == ack.getSeqNo()) {
                        this.processWriteAck(sentMessage, ack);
                        ++inFlightFreed;
                        bytesFreed += sentMessage.getSizeBytes();
                        this.sentMessages.remove();
                        continue block3;
                    }
                    if (sentMessage.getSeqNo() < ack.getSeqNo()) {
                        logger.warn("Received an ack for seqNo {}, but the oldest seqNo waiting for ack is {}", (Object)ack.getSeqNo(), (Object)sentMessage.getSeqNo());
                        sentMessage.getFuture().completeExceptionally(new RuntimeException("Didn't get ack from server for this message"));
                        ++inFlightFreed;
                        bytesFreed += sentMessage.getSizeBytes();
                        this.sentMessages.remove();
                        continue;
                    }
                    logger.info("Received an ack with seqNo {} which is older than the oldest message with seqNo {} waiting for ack", (Object)ack.getSeqNo(), (Object)sentMessage.getSeqNo());
                    continue block3;
                }
            }
            this.free(inFlightFreed, bytesFreed);
        }
    }

    private void processWriteAck(EnqueuedMessage message, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
        WriteAck resultAck;
        block0 : switch (ack.getMessageWriteStatusCase()) {
            case WRITTEN: {
                WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset());
                resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details);
                break;
            }
            case SKIPPED: {
                switch (ack.getSkipped().getReason()) {
                    case REASON_ALREADY_WRITTEN: {
                        resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
                        break block0;
                    }
                }
                message.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck skipped reason"));
                return;
            }
            default: {
                message.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck state"));
                return;
            }
        }
        message.getFuture().complete(resultAck);
    }

    private void completeSession(Status status, Throwable th) {
        logger.info("CompleteSession called");
        this.session.stop();
        if (th != null) {
            logger.error("Exception in writing stream session {}: {}", (Object)this.currentSessionId, (Object)th);
        } else {
            if (status.isSuccess()) {
                if (this.isStopped.get()) {
                    logger.info("Writing stream session {} closed successfully", (Object)this.currentSessionId);
                } else if (this.isReconnecting.get()) {
                    logger.info("Current session is closing, but reconnect is already scheduled");
                } else {
                    logger.error("Writing stream session {} was closed unexpectedly. Shutting down the writer.", (Object)this.currentSessionId);
                    this.shutdownImpl("Writing stream session " + this.currentSessionId + " was closed unexpectedly. Shutting down writer.");
                }
                return;
            }
            logger.error("Error in writing stream session {}: {}", (Object)(this.currentSessionId != null ? this.currentSessionId : ""), (Object)status);
        }
        if (this.isStopped.get()) {
            return;
        }
        int currentReconnectCounter = this.reconnectCounter.incrementAndGet();
        if (this.isReconnecting.compareAndSet(false, true)) {
            int delayMs = currentReconnectCounter <= 7 ? 256 * (1 << currentReconnectCounter) : 40000;
            delayMs += ThreadLocalRandom.current().nextInt(delayMs);
            logger.warn("Retry #" + currentReconnectCounter + ". Scheduling reconnect in {}ms...", (Object)delayMs);
            this.topicRpc.getScheduler().schedule(this::reconnect, (long)delayMs, TimeUnit.MILLISECONDS);
        } else {
            logger.debug("Should reconnect, but reconnect is already in progress");
        }
    }

    private static class IncomingMessage {
        private final EnqueuedMessage message;
        private final CompletableFuture<Void> future = new CompletableFuture();

        private IncomingMessage(EnqueuedMessage message) {
            this.message = message;
        }
    }
}

