/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import com.google.protobuf.Timestamp;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.Batch;
import tech.ydb.topic.read.impl.BatchMeta;
import tech.ydb.topic.read.impl.MessageImpl;
import tech.ydb.topic.read.impl.OffsetsRange;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
import tech.ydb.topic.utils.Encoder;

public class PartitionSession {
    private static final Logger logger = LoggerFactory.getLogger(PartitionSession.class);
    private final long id;
    private final String path;
    private final long partitionId;
    private final tech.ydb.topic.read.PartitionSession sessionInfo;
    private final OffsetsRange partitionOffsets;
    private final Executor decompressionExecutor;
    private final AtomicBoolean isWorking = new AtomicBoolean(true);
    private final Queue<Batch> decodingBatches = new LinkedList<Batch>();
    private final Queue<Batch> readingQueue = new ConcurrentLinkedQueue<Batch>();
    private final Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
    private final AtomicBoolean isReadingNow = new AtomicBoolean(false);
    private final BiConsumer<Long, OffsetsRange> commitFunction;
    private final NavigableMap<Long, CompletableFuture<Void>> commitFutures = new ConcurrentSkipListMap<Long, CompletableFuture<Void>>();
    private long lastReadOffset;
    private long lastCommittedOffset;

    private PartitionSession(Builder builder) {
        this.id = builder.id;
        this.path = builder.path;
        this.partitionId = builder.partitionId;
        this.sessionInfo = new tech.ydb.topic.read.PartitionSession(this.id, this.partitionId, this.path);
        this.lastReadOffset = builder.committedOffset;
        this.lastCommittedOffset = builder.committedOffset;
        this.partitionOffsets = builder.partitionOffsets;
        this.decompressionExecutor = builder.decompressionExecutor;
        this.dataEventCallback = builder.dataEventCallback;
        this.commitFunction = builder.commitFunction;
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public void init() {
        logger.info("Partition session {} (partition {}) is started", (Object)this.id, (Object)this.partitionId);
    }

    public long getId() {
        return this.id;
    }

    public long getPartitionId() {
        return this.partitionId;
    }

    public String getPath() {
        return this.path;
    }

    public tech.ydb.topic.read.PartitionSession getSessionInfo() {
        return this.sessionInfo;
    }

    public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadResponse.Batch> batches) {
        if (!this.isWorking.get()) {
            return CompletableFuture.completedFuture(null);
        }
        LinkedList batchFutures = new LinkedList();
        batches.forEach(batch -> {
            BatchMeta batchMeta = new BatchMeta((YdbTopic.StreamReadMessage.ReadResponse.Batch)batch);
            Batch newBatch = new Batch(batchMeta);
            List batchMessages = batch.getMessageDataList();
            if (!batchMessages.isEmpty()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Received a batch of {} messages (offsets {} - {}) for partition session {} (partition {})", new Object[]{batchMessages.size(), ((YdbTopic.StreamReadMessage.ReadResponse.MessageData)batchMessages.get(0)).getOffset(), ((YdbTopic.StreamReadMessage.ReadResponse.MessageData)batchMessages.get(batchMessages.size() - 1)).getOffset(), this.id, this.partitionId});
                }
            } else {
                logger.error("Received empty batch for partition session {} (partition {}). This shouldn't happen", (Object)this.id, (Object)this.partitionId);
            }
            batchMessages.forEach(messageData -> {
                long commitOffsetFrom = this.lastReadOffset;
                long messageOffset = messageData.getOffset();
                long newReadOffset = messageOffset + 1L;
                if (newReadOffset > this.lastReadOffset) {
                    this.lastReadOffset = newReadOffset;
                } else {
                    logger.error("Received a message with offset {} which is less than last read offset {} for partition session {} (partition {})", new Object[]{messageOffset, this.lastReadOffset, this.id, this.partitionId});
                }
                newBatch.addMessage(new MessageImpl.Builder().setBatchMeta(batchMeta).setPartitionSession(this.sessionInfo).setData(messageData.getData().toByteArray()).setOffset(messageOffset).setSeqNo(messageData.getSeqNo()).setCommitOffsetFrom(commitOffsetFrom).setCreatedAt(ProtobufUtils.protoToInstant((Timestamp)messageData.getCreatedAt())).setMessageGroupId(messageData.getMessageGroupId()).setCommitFunction(this::commitOffset).build());
            });
            batchFutures.add(newBatch.getReadFuture());
            Queue<Batch> queue = this.decodingBatches;
            synchronized (queue) {
                this.decodingBatches.add(newBatch);
            }
            CompletableFuture.runAsync(() -> this.decode(newBatch), this.decompressionExecutor).thenRun(() -> {
                boolean haveNewBatchesReady = false;
                Queue<Batch> queue = this.decodingBatches;
                synchronized (queue) {
                    Batch decodingBatch;
                    while ((decodingBatch = this.decodingBatches.peek()) != null && (decodingBatch.isDecompressed() || decodingBatch.getCodec() == Codec.RAW)) {
                        this.decodingBatches.remove();
                        if (logger.isTraceEnabled()) {
                            List<MessageImpl> messages = decodingBatch.getMessages();
                            logger.trace("Adding batch with offsets {}-{} to reading queue of partition session {} (partition {})", new Object[]{messages.get(0).getOffset(), messages.get(messages.size() - 1).getOffset(), this.id, this.partitionId});
                        }
                        this.readingQueue.add(decodingBatch);
                        haveNewBatchesReady = true;
                    }
                }
                if (haveNewBatchesReady) {
                    this.sendDataToReadersIfNeeded();
                }
            });
        });
        return CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0]));
    }

    private CompletableFuture<Void> commitOffset(OffsetsRange offsets) {
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        if (this.isWorking.get()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Offset range [{}, {}) is requested to be committed for partition session {} (partition {}). Last committed offset is {} (read lag is {})", new Object[]{offsets.getStart(), offsets.getEnd(), this.id, this.partitionId, this.lastCommittedOffset, offsets.getStart() - this.lastCommittedOffset});
            }
            this.commitFutures.put(offsets.getEnd(), resultFuture);
            this.commitFunction.accept(this.getId(), offsets);
        } else {
            logger.info("Offset range [{}, {}) is requested to be committed, but partition session {} (partition {}) is already closed", new Object[]{offsets.getStart(), offsets.getEnd(), this.id, this.partitionId});
            resultFuture.completeExceptionally(new RuntimeException("Partition session " + this.id + " is already closed"));
        }
        return resultFuture;
    }

    public void handleCommitResponse(long committedOffset) {
        if (committedOffset <= this.lastCommittedOffset) {
            logger.error("Commit response received for partition session {} (partition {}). Committed offset: {} which is not greater than previous committed offset: {}.", new Object[]{this.id, this.partitionId, committedOffset, this.lastCommittedOffset});
            return;
        }
        NavigableMap<Long, CompletableFuture<Void>> futuresToComplete = this.commitFutures.headMap(committedOffset, true);
        if (logger.isDebugEnabled()) {
            logger.debug("Commit response received for partition session {} (partition {}). Committed offset: {}. Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", new Object[]{this.id, this.partitionId, committedOffset, this.lastCommittedOffset, committedOffset - this.lastCommittedOffset, futuresToComplete.size()});
        }
        this.lastCommittedOffset = committedOffset;
        futuresToComplete.values().forEach(future -> future.complete(null));
        futuresToComplete.clear();
    }

    private void decode(Batch batch) {
        if (logger.isTraceEnabled()) {
            logger.trace("Started decoding batch for partition session {} (partition {})", (Object)this.id, (Object)this.partitionId);
        }
        if (batch.getCodec() == Codec.RAW) {
            return;
        }
        batch.getMessages().forEach(message -> {
            message.setData(Encoder.decode(batch.getCodec(), message.getData()));
            message.setDecompressed(true);
        });
        batch.setDecompressed(true);
        if (logger.isTraceEnabled()) {
            logger.trace("Finished decoding batch for partition session {} (partition {})", (Object)this.id, (Object)this.partitionId);
        }
    }

    private void sendDataToReadersIfNeeded() {
        if (this.isReadingNow.compareAndSet(false, true)) {
            Batch batchToRead = this.readingQueue.poll();
            if (batchToRead == null) {
                this.isReadingNow.set(false);
                return;
            }
            List<MessageImpl> messageImplList = batchToRead.getMessages();
            ArrayList<Message> messagesToRead = new ArrayList<Message>(messageImplList);
            DataReceivedEventImpl event = new DataReceivedEventImpl(messagesToRead, this.sessionInfo, () -> this.commitOffset(new OffsetsRange(((MessageImpl)messageImplList.get(0)).getCommitOffsetFrom(), ((MessageImpl)messageImplList.get(messageImplList.size() - 1)).getOffset() + 1L)));
            if (logger.isDebugEnabled()) {
                logger.debug("DataReceivedEvent callback for partition session {} (partition {}) is about to be called...", (Object)this.id, (Object)this.partitionId);
            }
            this.dataEventCallback.apply(event).whenComplete((res, th) -> {
                if (th != null) {
                    logger.error("DataReceivedEvent callback for partition session {} (partition {}) finished with error: {}", new Object[]{this.id, this.partitionId, th});
                } else if (logger.isDebugEnabled()) {
                    logger.debug("DataReceivedEvent callback for partition session {} (partition {}) successfully finished", (Object)this.id, (Object)this.partitionId);
                }
                this.isReadingNow.set(false);
                batchToRead.complete();
                this.sendDataToReadersIfNeeded();
            });
        } else if (logger.isTraceEnabled()) {
            logger.trace("Partition session {} (partition {}) - no need to send data to readers: reading is already being performed", (Object)this.id, (Object)this.partitionId);
        }
    }

    public void shutdown() {
        this.isWorking.set(false);
        logger.info("Partition session {} (partition {}) is shutting down. Failing {} commit futures...", new Object[]{this.id, this.partitionId, this.commitFutures.size()});
        this.commitFutures.values().forEach(f -> f.completeExceptionally(new RuntimeException("Partition session closed")));
    }

    public static class Builder {
        private long id;
        private String path;
        private long partitionId;
        private long committedOffset;
        private OffsetsRange partitionOffsets;
        private Executor decompressionExecutor;
        private Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
        private BiConsumer<Long, OffsetsRange> commitFunction;

        public Builder setId(long id) {
            this.id = id;
            return this;
        }

        public Builder setPath(String path) {
            this.path = path;
            return this;
        }

        public Builder setPartitionId(long partitionId) {
            this.partitionId = partitionId;
            return this;
        }

        public Builder setCommittedOffset(long committedOffset) {
            this.committedOffset = committedOffset;
            return this;
        }

        public Builder setPartitionOffsets(OffsetsRange partitionOffsets) {
            this.partitionOffsets = partitionOffsets;
            return this;
        }

        public Builder setDecompressionExecutor(Executor decompressionExecutor) {
            this.decompressionExecutor = decompressionExecutor;
            return this;
        }

        public Builder setDataEventCallback(Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback) {
            this.dataEventCallback = dataEventCallback;
            return this;
        }

        public Builder setCommitFunction(BiConsumer<Long, OffsetsRange> commitFunction) {
            this.commitFunction = commitFunction;
            return this;
        }

        public PartitionSession build() {
            return new PartitionSession(this);
        }
    }
}

