/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.write.impl;

import com.google.protobuf.ByteString;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.ProtoUtils;
import tech.ydb.topic.write.impl.EnqueuedMessage;
import tech.ydb.topic.write.impl.WriteSession;

public class MessageSender {
    private static final Logger logger = LoggerFactory.getLogger(MessageSender.class);
    private static final int MAX_GRPC_MESSAGE_SIZE = 64000000;
    private final WriterSettings settings;
    private final int requestOverheadBytes;
    private final int messageOverheadBytes;
    private WriteSession session;
    private long seqNo = 0L;
    private long totalMessageDataProtoSize;
    private YdbTopic.StreamWriteMessage.WriteRequest.Builder writeRequestBuilder;
    private int messageCount;
    private YdbTransaction currentTransaction;

    public MessageSender(WriterSettings settings) {
        this.settings = settings;
        List<Integer> overheads = this.calculateOverheads();
        this.messageOverheadBytes = overheads.get(0);
        this.requestOverheadBytes = overheads.get(1);
    }

    private List<Integer> calculateOverheads() {
        this.reset();
        YdbTopic.StreamWriteMessage.FromClient requestWithoutMessage = YdbTopic.StreamWriteMessage.FromClient.newBuilder().setWriteRequest(this.writeRequestBuilder.build()).build();
        int requestOverhead = requestWithoutMessage.getSerializedSize();
        YdbTopic.StreamWriteMessage.WriteRequest.MessageData messageData = YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder().setSeqNo(Long.MAX_VALUE).setData(ByteString.EMPTY).setCreatedAt(ProtobufUtils.instantToProto((Instant)Instant.now())).setUncompressedSize(1000000L).build();
        YdbTopic.StreamWriteMessage.FromClient requestWithMessage = YdbTopic.StreamWriteMessage.FromClient.newBuilder().setWriteRequest(this.writeRequestBuilder.addMessages(messageData)).build();
        int messageDataSize = messageData.getSerializedSize();
        int sizeWithMessage = requestWithMessage.getSerializedSize();
        int messageOverhead = sizeWithMessage - requestOverhead - messageDataSize;
        logger.debug("Calculated per-message bytes overhead: {}, request overhead: {}", (Object)messageOverhead, (Object)requestOverhead);
        return Arrays.asList(messageOverhead, requestOverhead);
    }

    public void setSeqNo(long seqNo) {
        this.seqNo = seqNo;
    }

    public void setSession(WriteSession session) {
        this.session = session;
    }

    private void reset() {
        this.writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder().setCodec(ProtoUtils.toProto(this.settings.getCodec()));
        this.messageCount = 0;
        this.totalMessageDataProtoSize = 0L;
    }

    public long getCurrentRequestSize() {
        return (long)this.requestOverheadBytes + this.totalMessageDataProtoSize + (long)this.messageCount * (long)this.messageOverheadBytes;
    }

    public void addMessage(YdbTopic.StreamWriteMessage.WriteRequest.MessageData message) {
        ++this.messageCount;
        this.totalMessageDataProtoSize += (long)message.getSerializedSize();
        this.writeRequestBuilder.addMessages(message);
    }

    public void sendWriteRequest() {
        List messages;
        if (this.currentTransaction != null) {
            this.writeRequestBuilder.setTx(YdbTopic.TransactionIdentity.newBuilder().setId(this.currentTransaction.getId()).setSession(this.currentTransaction.getSessionId()));
        }
        YdbTopic.StreamWriteMessage.FromClient fromClient = YdbTopic.StreamWriteMessage.FromClient.newBuilder().setWriteRequest(this.writeRequestBuilder).build();
        if (logger.isDebugEnabled()) {
            logger.debug("Predicted request size: {} = {}(request overhead) + {}(all MessageData protos) + {}(message overheads)\nActual request size: {} bytes", new Object[]{this.getCurrentRequestSize(), this.requestOverheadBytes, this.totalMessageDataProtoSize, this.messageOverheadBytes * this.messageCount, fromClient.getSerializedSize()});
        }
        if (fromClient.getSerializedSize() > 64000000 && (messages = this.writeRequestBuilder.getMessagesList()).size() > 1) {
            int firstHalfMessagesCount = messages.size() / 2;
            logger.debug("Failed to predict request total size. Total size is {} which exceeds the limit of {}. Splitting {} messages into two requests of {} and {} messages", new Object[]{fromClient.getSerializedSize(), 64000000, messages.size(), firstHalfMessagesCount, messages.size() - firstHalfMessagesCount});
            for (List sublist : Arrays.asList(messages.subList(0, firstHalfMessagesCount), messages.subList(firstHalfMessagesCount, messages.size()))) {
                this.writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder().setCodec(ProtoUtils.toProto(this.settings.getCodec()));
                this.writeRequestBuilder.addAllMessages((Iterable)sublist);
                YdbTopic.StreamWriteMessage.FromClient subRequest = YdbTopic.StreamWriteMessage.FromClient.newBuilder().setWriteRequest(this.writeRequestBuilder).build();
                logger.debug("Total sub-request size: {} bytes", (Object)subRequest.getSerializedSize());
                this.session.send(subRequest);
            }
            return;
        }
        this.session.send(fromClient);
    }

    public void tryAddMessageToRequest(EnqueuedMessage message) {
        long messageSeqNo;
        if (message.getTransaction() != this.currentTransaction) {
            if (this.messageCount > 0) {
                this.sendWriteRequest();
            }
            this.currentTransaction = message.getTransaction();
        }
        long l = message.getSeqNo() == null ? (message.getMessage().getSeqNo() == null ? (this.seqNo = this.seqNo + 1L) : message.getMessage().getSeqNo()) : (messageSeqNo = message.getSeqNo().longValue());
        if (message.getSeqNo() == null) {
            message.setSeqNo(messageSeqNo);
        }
        YdbTopic.StreamWriteMessage.WriteRequest.MessageData.Builder messageDataBuilder = YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder().setSeqNo(messageSeqNo).setData(ByteString.copyFrom((byte[])message.getMessage().getData())).setCreatedAt(ProtobufUtils.instantToProto((Instant)message.getMessage().getCreateTimestamp())).setUncompressedSize(message.getUncompressedSizeBytes());
        List<MetadataItem> metadataItems = message.getMessage().getMetadataItems();
        if (metadataItems != null && !metadataItems.isEmpty()) {
            messageDataBuilder.addAllMetadataItems((Iterable)metadataItems.stream().map(metadataItem -> YdbTopic.MetadataItem.newBuilder().setKey(metadataItem.getKey()).setValue(ByteString.copyFrom((byte[])metadataItem.getValue())).build()).collect(Collectors.toList()));
        }
        YdbTopic.StreamWriteMessage.WriteRequest.MessageData messageData = messageDataBuilder.build();
        long sizeWithCurrentMessage = this.getCurrentRequestSize() + (long)messageData.getSerializedSize() + (long)this.messageOverheadBytes;
        if (sizeWithCurrentMessage <= 64000000L) {
            this.addMessage(messageData);
        } else if (this.messageCount > 0) {
            logger.debug("Adding next message to the same request would lead to grpc request size overflow. Sending previous {} messages...", (Object)this.messageCount);
            this.sendWriteRequest();
            this.reset();
            this.addMessage(messageData);
        } else {
            logger.error("A single message is larger than grpc size limit. Sending it anyway...");
            this.addMessage(messageData);
            this.sendWriteRequest();
            this.reset();
        }
    }

    public void sendMessages(Queue<EnqueuedMessage> messages) {
        if (logger.isDebugEnabled()) {
            logger.debug("Trying to send {} message(s)...", (Object)messages.size());
        }
        this.reset();
        messages.forEach(this::tryAddMessageToRequest);
        if (this.messageCount > 0) {
            this.sendWriteRequest();
        }
    }
}

