/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.write.impl;

import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.impl.GrpcStreamRetrier;
import tech.ydb.topic.settings.SendSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.Encoder;
import tech.ydb.topic.write.InitResult;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.QueueOverflowException;
import tech.ydb.topic.write.WriteAck;
import tech.ydb.topic.write.impl.EnqueuedMessage;
import tech.ydb.topic.write.impl.MessageSender;
import tech.ydb.topic.write.impl.WriteSession;

public abstract class WriterImpl
extends GrpcStreamRetrier {
    private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);
    private WriteSessionImpl session;
    private final WriterSettings settings;
    private final TopicRpc topicRpc;
    private final AtomicReference<CompletableFuture<InitResult>> initResultFutureRef = new AtomicReference<Object>(null);
    private final Queue<IncomingMessage> incomingQueue = new LinkedList<IncomingMessage>();
    private final Queue<EnqueuedMessage> encodingMessages = new LinkedList<EnqueuedMessage>();
    private final Queue<EnqueuedMessage> sendingQueue = new ConcurrentLinkedQueue<EnqueuedMessage>();
    private final Deque<EnqueuedMessage> sentMessages = new ConcurrentLinkedDeque<EnqueuedMessage>();
    private final AtomicBoolean writeRequestInProgress = new AtomicBoolean();
    private final Executor compressionExecutor;
    private final long maxSendBufferMemorySize;
    private final AtomicLong sessionSeqNumberCounter = new AtomicLong(0L);
    private Boolean isSeqNoProvided = null;
    private int currentInFlightCount = 0;
    private long availableSizeBytes;
    private CompletableFuture<WriteAck> lastAcceptedMessageFuture;

    public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
        super(topicRpc.getScheduler());
        this.topicRpc = topicRpc;
        this.settings = settings;
        this.session = new WriteSessionImpl();
        this.availableSizeBytes = settings.getMaxSendBufferMemorySize();
        this.maxSendBufferMemorySize = settings.getMaxSendBufferMemorySize();
        this.compressionExecutor = compressionExecutor;
        String message = "Writer (generated id " + this.id + ") created for topic \"" + settings.getTopicPath() + "\" with producerId \"" + settings.getProducerId() + "\" and messageGroupId \"" + settings.getMessageGroupId() + "\"";
        logger.info(message);
    }

    @Override
    protected Logger getLogger() {
        return logger;
    }

    @Override
    protected String getStreamName() {
        return "Writer";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean instant) {
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            if (this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                if (instant) {
                    logger.info("[{}] Rejecting a message due to reaching message queue in-flight limit of {}", (Object)this.id, (Object)this.settings.getMaxSendBufferMessagesCount());
                    CompletableFuture<Void> result = new CompletableFuture<Void>();
                    result.completeExceptionally(new QueueOverflowException("Message queue in-flight limit of " + this.settings.getMaxSendBufferMessagesCount() + " reached"));
                    return result;
                }
                logger.info("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming waiting queue", (Object)this.id, (Object)this.settings.getMaxSendBufferMessagesCount());
            } else if (this.availableSizeBytes < (long)message.getMessage().getData().length) {
                if (instant) {
                    String errorMessage = "[" + this.id + "] Rejecting a message of " + message.getMessage().getData().length + " bytes: not enough space in message queue. Buffer currently has " + this.currentInFlightCount + " messages with " + this.availableSizeBytes + " / " + this.settings.getMaxSendBufferMemorySize() + " bytes available";
                    logger.info(errorMessage);
                    CompletableFuture<Void> result = new CompletableFuture<Void>();
                    result.completeExceptionally(new QueueOverflowException(errorMessage));
                    return result;
                }
                logger.info("[{}] Can't accept a message of {} bytes into message queue. Buffer currently has {} messages with {} / {} bytes available. Putting the message into incoming waiting queue.", new Object[]{this.id, message.getMessage().getData().length, this.currentInFlightCount, this.availableSizeBytes, this.settings.getMaxSendBufferMemorySize()});
            } else if (this.incomingQueue.isEmpty()) {
                this.acceptMessageIntoSendingQueue(message);
                return CompletableFuture.completedFuture(null);
            }
            IncomingMessage incomingMessage = new IncomingMessage(message);
            this.incomingQueue.add(incomingMessage);
            return incomingMessage.future;
        }
    }

    private void acceptMessageIntoSendingQueue(EnqueuedMessage message) {
        this.lastAcceptedMessageFuture = message.getFuture();
        ++this.currentInFlightCount;
        this.availableSizeBytes -= message.getUncompressedSizeBytes();
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, AvailableSizeBytes: {} ({} / {} acquired)", new Object[]{this.id, message.getUncompressedSizeBytes(), this.currentInFlightCount, this.availableSizeBytes, this.maxSendBufferMemorySize - this.availableSizeBytes, this.maxSendBufferMemorySize});
        }
        this.encodingMessages.add(message);
        CompletableFuture.runAsync(() -> {
            this.encode(message);
            this.moveEncodedMessagesToSendingQueue();
        }, this.compressionExecutor).exceptionally(throwable -> {
            logger.error("[{}] Exception while encoding message: ", (Object)this.id, throwable);
            this.free(1, message.getSizeBytes());
            message.getFuture().completeExceptionally((Throwable)throwable);
            message.setProcessingFailed(true);
            this.moveEncodedMessagesToSendingQueue();
            return null;
        });
    }

    private void encode(EnqueuedMessage message) {
        logger.trace("[{}] Started encoding message", (Object)this.id);
        if (this.settings.getCodec() == Codec.RAW) {
            return;
        }
        message.getMessage().setData(Encoder.encode(this.settings.getCodec(), message.getMessage().getData()));
        message.setCompressedSizeBytes(message.getMessage().getData().length);
        message.setCompressed(true);
        logger.trace("[{}] Successfully finished encoding message", (Object)this.id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void moveEncodedMessagesToSendingQueue() {
        boolean haveNewMessagesToSend = false;
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            EnqueuedMessage encodedMessage;
            while ((encodedMessage = this.encodingMessages.peek()) != null) {
                if (encodedMessage.isProcessingFailed()) {
                    this.encodingMessages.remove();
                    continue;
                }
                if (!encodedMessage.isCompressed() && this.settings.getCodec() != Codec.RAW) break;
                this.encodingMessages.remove();
                if (encodedMessage.isCompressed()) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("[{}] Message compressed from {} to {} bytes", new Object[]{this.id, encodedMessage.getUncompressedSizeBytes(), encodedMessage.getCompressedSizeBytes()});
                    }
                    long bytesFreed = encodedMessage.getUncompressedSizeBytes() - encodedMessage.getCompressedSizeBytes();
                    this.free(0, bytesFreed);
                }
                logger.debug("[{}] Adding message to sending queue", (Object)this.id);
                this.sendingQueue.add(encodedMessage);
                haveNewMessagesToSend = true;
            }
        }
        if (haveNewMessagesToSend) {
            this.session.sendDataRequestIfNeeded();
        }
    }

    protected CompletableFuture<InitResult> initImpl() {
        logger.info("[{}] initImpl called", (Object)this.id);
        if (this.initResultFutureRef.compareAndSet(null, new CompletableFuture())) {
            this.session.startAndInitialize();
        } else {
            logger.warn("[{}] Init is called on this writer more than once. Nothing is done", (Object)this.id);
        }
        return this.initResultFutureRef.get();
    }

    protected CompletableFuture<CompletableFuture<WriteAck>> sendImpl(Message message, SendSettings sendSettings, boolean instant) {
        if (this.isStopped.get()) {
            throw new RuntimeException("Writer is already stopped");
        }
        if (this.isSeqNoProvided != null) {
            if (message.getSeqNo() != null && !this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was provided for a message after it had not been provided for another message. SeqNo should either be provided for all messages or none of them.");
            }
            if (message.getSeqNo() == null && this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was not provided for a message after it had been provided for another message. SeqNo should either be provided for all messages or none of them.");
            }
        } else {
            this.isSeqNoProvided = message.getSeqNo() != null;
        }
        EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message, sendSettings);
        return this.tryToEnqueue(enqueuedMessage, instant).thenApply(v -> enqueuedMessage.getFuture());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> flushImpl() {
        if (this.lastAcceptedMessageFuture == null) {
            return CompletableFuture.completedFuture(null);
        }
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            return this.lastAcceptedMessageFuture.isDone() ? CompletableFuture.completedFuture(null) : this.lastAcceptedMessageFuture.thenApply(v -> null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void free(int messageCount, long sizeBytes) {
        Queue<IncomingMessage> queue = this.incomingQueue;
        synchronized (queue) {
            this.currentInFlightCount -= messageCount;
            this.availableSizeBytes += sizeBytes;
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Freed {} bytes in {} messages. Current In-flight: {}, current availableSize: {} ({} / {} acquired)", new Object[]{this.id, sizeBytes, messageCount, this.currentInFlightCount, this.availableSizeBytes, this.maxSendBufferMemorySize - this.availableSizeBytes, this.maxSendBufferMemorySize});
            }
            if (sizeBytes > 0L && !this.incomingQueue.isEmpty()) {
                IncomingMessage incomingMessage;
                while ((incomingMessage = this.incomingQueue.peek()) != null) {
                    if (incomingMessage.message.getUncompressedSizeBytes() > this.availableSizeBytes || this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                        logger.trace("[{}] There are messages in incomingQueue still, but no space in send buffer", (Object)this.id);
                        return;
                    }
                    logger.trace("[{}] Putting a message into send buffer after freeing some space", (Object)this.id);
                    if (incomingMessage.future.complete(null)) {
                        this.acceptMessageIntoSendingQueue(incomingMessage.message);
                    }
                    this.incomingQueue.remove();
                }
                logger.trace("[{}] All messages from incomingQueue are accepted into send buffer", (Object)this.id);
            }
        }
    }

    @Override
    protected void onStreamReconnect() {
        this.session = new WriteSessionImpl();
        this.session.startAndInitialize();
    }

    @Override
    protected void onShutdown(String reason) {
        this.session.shutdown();
        if (this.initResultFutureRef.get() != null && !this.initResultFutureRef.get().isDone()) {
            this.initResultFutureRef.get().completeExceptionally(new RuntimeException(reason));
        }
    }

    private class WriteSessionImpl
    extends WriteSession {
        protected String sessionId;
        private final String fullId;
        private final MessageSender messageSender;
        private final AtomicBoolean isInitialized;

        private WriteSessionImpl() {
            super(WriterImpl.this.topicRpc);
            this.sessionId = "";
            this.isInitialized = new AtomicBoolean(false);
            this.fullId = WriterImpl.this.id + '.' + WriterImpl.this.sessionSeqNumberCounter.incrementAndGet();
            this.messageSender = new MessageSender(WriterImpl.this.settings);
        }

        @Override
        public void startAndInitialize() {
            logger.debug("[{}] Session {} startAndInitialize called", (Object)this.fullId, (Object)this.sessionId);
            this.start(this::processMessage).whenComplete(this::closeDueToError);
            YdbTopic.StreamWriteMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamWriteMessage.InitRequest.newBuilder().setPath(WriterImpl.this.settings.getTopicPath());
            String producerId = WriterImpl.this.settings.getProducerId();
            if (producerId != null) {
                initRequestBuilder.setProducerId(producerId);
            }
            String messageGroupId = WriterImpl.this.settings.getMessageGroupId();
            Long partitionId = WriterImpl.this.settings.getPartitionId();
            if (messageGroupId != null) {
                if (partitionId != null) {
                    throw new IllegalArgumentException("Both MessageGroupId and PartitionId are set in WriterSettings");
                }
                initRequestBuilder.setMessageGroupId(messageGroupId);
            } else if (partitionId != null) {
                initRequestBuilder.setPartitionId(partitionId.longValue());
            }
            this.send(YdbTopic.StreamWriteMessage.FromClient.newBuilder().setInitRequest(initRequestBuilder).build());
        }

        private void sendDataRequestIfNeeded() {
            while (true) {
                if (!this.isInitialized.get()) {
                    logger.debug("[{}] Can't send data: current session is not yet initialized", (Object)this.fullId);
                    return;
                }
                if (!this.isWorking.get()) {
                    logger.debug("[{}] Can't send data: current session has been already stopped", (Object)this.fullId);
                    return;
                }
                if (WriterImpl.this.sendingQueue.isEmpty()) {
                    logger.trace("[{}] Nothing to send -- sendingQueue is empty", (Object)this.fullId);
                    return;
                }
                if (!WriterImpl.this.writeRequestInProgress.compareAndSet(false, true)) {
                    logger.debug("[{}] Send request is already in progress", (Object)this.fullId);
                    return;
                }
                LinkedList<EnqueuedMessage> messages = new LinkedList<EnqueuedMessage>(WriterImpl.this.sendingQueue);
                if (messages.isEmpty()) {
                    logger.debug("[{}] Nothing to send -- sendingQueue is empty #2", (Object)this.fullId);
                } else {
                    WriterImpl.this.sendingQueue.removeAll(messages);
                    WriterImpl.this.sentMessages.addAll(messages);
                    this.messageSender.sendMessages(messages);
                    logger.debug("[{}] Sent {} messages to server", (Object)this.fullId, (Object)messages.size());
                }
                if (WriterImpl.this.writeRequestInProgress.compareAndSet(true, false)) continue;
                logger.error("[{}] Couldn't turn off writeRequestInProgress. Should not happen", (Object)this.fullId);
            }
        }

        private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
            long lastSeqNo;
            this.sessionId = response.getSessionId();
            logger.info("[{}] Session {} initialized", (Object)this.fullId, (Object)this.sessionId);
            long actualLastSeqNo = lastSeqNo = response.getLastSeqNo();
            if (!WriterImpl.this.sentMessages.isEmpty()) {
                actualLastSeqNo = Math.max(lastSeqNo, ((EnqueuedMessage)WriterImpl.this.sentMessages.getLast()).getSeqNo());
            }
            this.messageSender.setSession(this);
            this.messageSender.setSeqNo(actualLastSeqNo);
            if (!WriterImpl.this.sentMessages.isEmpty()) {
                logger.info("Resending {} messages that haven't received ack's yet into new session...", (Object)WriterImpl.this.sentMessages.size());
                this.messageSender.sendMessages(WriterImpl.this.sentMessages);
            }
            if (WriterImpl.this.initResultFutureRef.get() != null) {
                ((CompletableFuture)WriterImpl.this.initResultFutureRef.get()).complete(new InitResult(lastSeqNo));
            }
            this.isInitialized.set(true);
            this.sendDataRequestIfNeeded();
        }

        private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) {
            List acks = response.getAcksList();
            logger.debug("[{}] Received WriteResponse with {} WriteAcks", (Object)this.fullId, (Object)acks.size());
            int inFlightFreed = 0;
            long bytesFreed = 0L;
            block0: for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
                EnqueuedMessage sentMessage;
                while ((sentMessage = (EnqueuedMessage)WriterImpl.this.sentMessages.peek()) != null) {
                    if (sentMessage.getSeqNo().longValue() == ack.getSeqNo()) {
                        ++inFlightFreed;
                        bytesFreed += sentMessage.getSizeBytes();
                        WriterImpl.this.sentMessages.remove();
                        this.processWriteAck(sentMessage, ack);
                        continue block0;
                    }
                    if (sentMessage.getSeqNo() < ack.getSeqNo()) {
                        logger.warn("[{}] Received an ack for seqNo {}, but the oldest seqNo waiting for ack is {}", new Object[]{this.fullId, ack.getSeqNo(), sentMessage.getSeqNo()});
                        sentMessage.getFuture().completeExceptionally(new RuntimeException("Didn't get ack from server for this message"));
                        ++inFlightFreed;
                        bytesFreed += sentMessage.getSizeBytes();
                        WriterImpl.this.sentMessages.remove();
                        continue;
                    }
                    logger.warn("[{}] Received an ack with seqNo {} which is older than the oldest message with seqNo {} waiting for ack", new Object[]{this.fullId, ack.getSeqNo(), sentMessage.getSeqNo()});
                    continue block0;
                }
            }
            WriterImpl.this.free(inFlightFreed, bytesFreed);
        }

        private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
            logger.debug("[{}] processMessage called", (Object)this.fullId);
            if (message.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
                logger.warn("[{}] Got non-success status in processMessage method: {}", (Object)this.fullId, (Object)message);
                WriterImpl.this.onSessionClosed(Status.of((StatusCode)StatusCode.fromProto((StatusCodesProtos.StatusIds.StatusCode)message.getStatus())).withIssues(new Issue[]{Issue.of((String)("Got a message with non-success status: " + message), (Issue.Severity)Issue.Severity.ERROR)}), null);
                return;
            }
            WriterImpl.this.reconnectCounter.set(0);
            if (message.hasInitResponse()) {
                this.onInitResponse(message.getInitResponse());
            } else if (message.hasWriteResponse()) {
                this.onWriteResponse(message.getWriteResponse());
            }
        }

        private void processWriteAck(EnqueuedMessage message, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
            WriteAck resultAck;
            logger.debug("[{}] Received WriteAck with seqNo {} and status {}", new Object[]{this.fullId, ack.getSeqNo(), ack.getMessageWriteStatusCase()});
            block0 : switch (ack.getMessageWriteStatusCase()) {
                case WRITTEN: {
                    WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset());
                    resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details);
                    break;
                }
                case SKIPPED: {
                    switch (ack.getSkipped().getReason()) {
                        case REASON_ALREADY_WRITTEN: {
                            resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
                            break block0;
                        }
                    }
                    message.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck skipped reason"));
                    return;
                }
                default: {
                    message.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck state"));
                    return;
                }
            }
            message.getFuture().complete(resultAck);
        }

        private void closeDueToError(Status status, Throwable th) {
            logger.info("[{}] Session {} closeDueToError called", (Object)this.fullId, (Object)this.sessionId);
            if (this.shutdown()) {
                WriterImpl.this.onSessionClosed(status, th);
            }
        }

        @Override
        protected void onStop() {
            logger.debug("[{}] Session {} onStop called", (Object)this.fullId, (Object)this.sessionId);
        }
    }

    private static class IncomingMessage {
        private final EnqueuedMessage message;
        private final CompletableFuture<Void> future = new CompletableFuture();

        private IncomingMessage(EnqueuedMessage message) {
            this.message = message;
        }
    }
}

