/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.Batch;
import tech.ydb.topic.read.impl.BatchMeta;
import tech.ydb.topic.read.impl.MessageImpl;
import tech.ydb.topic.read.impl.OffsetsRangeImpl;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
import tech.ydb.topic.utils.Encoder;

public class PartitionSessionImpl {
    private static final Logger logger = LoggerFactory.getLogger(PartitionSessionImpl.class);
    private final long id;
    private final String path;
    private final long partitionId;
    private final PartitionSession sessionInfo;
    private final Executor decompressionExecutor;
    private final AtomicBoolean isWorking = new AtomicBoolean(true);
    private final Queue<Batch> decodingBatches = new LinkedList<Batch>();
    private final ReentrantLock decodingBatchesLock = new ReentrantLock();
    private final Queue<Batch> readingQueue = new ConcurrentLinkedQueue<Batch>();
    private final Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
    private final AtomicBoolean isReadingNow = new AtomicBoolean();
    private final Consumer<List<OffsetsRange>> commitFunction;
    private final NavigableMap<Long, CompletableFuture<Void>> commitFutures = new ConcurrentSkipListMap<Long, CompletableFuture<Void>>();
    private final ReentrantLock commitFuturesLock = new ReentrantLock();
    private long lastReadOffset;
    private long lastCommittedOffset;

    private PartitionSessionImpl(Builder builder) {
        this.id = builder.id;
        this.path = builder.path;
        this.partitionId = builder.partitionId;
        this.sessionInfo = new PartitionSession(this.id, this.partitionId, this.path);
        this.lastReadOffset = builder.committedOffset;
        this.lastCommittedOffset = builder.committedOffset;
        this.decompressionExecutor = builder.decompressionExecutor;
        this.dataEventCallback = builder.dataEventCallback;
        this.commitFunction = builder.commitFunction;
        logger.info("[{}] Partition session {} (partition {}) is started. CommittedOffset: {}. Partition offsets: {}-{}", new Object[]{this.path, this.id, this.partitionId, this.lastReadOffset, builder.partitionOffsets.getStart(), builder.partitionOffsets.getEnd()});
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public long getId() {
        return this.id;
    }

    public long getPartitionId() {
        return this.partitionId;
    }

    public String getPath() {
        return this.path;
    }

    public PartitionSession getSessionInfo() {
        return this.sessionInfo;
    }

    public void setLastReadOffset(long lastReadOffset) {
        this.lastReadOffset = lastReadOffset;
    }

    public void setLastCommittedOffset(long lastCommittedOffset) {
        this.lastCommittedOffset = lastCommittedOffset;
    }

    public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadResponse.Batch> batches) {
        if (!this.isWorking.get()) {
            return CompletableFuture.completedFuture(null);
        }
        LinkedList batchFutures = new LinkedList();
        batches.forEach(batch -> {
            BatchMeta batchMeta = new BatchMeta((YdbTopic.StreamReadMessage.ReadResponse.Batch)batch);
            Batch newBatch = new Batch(batchMeta);
            List batchMessages = batch.getMessageDataList();
            if (!batchMessages.isEmpty()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("[{}] Received a batch of {} messages (offsets {} - {}) for partition session {} (partition {})", new Object[]{this.path, batchMessages.size(), ((YdbTopic.StreamReadMessage.ReadResponse.MessageData)batchMessages.get(0)).getOffset(), ((YdbTopic.StreamReadMessage.ReadResponse.MessageData)batchMessages.get(batchMessages.size() - 1)).getOffset(), this.id, this.partitionId});
                }
            } else {
                logger.error("[{}] Received empty batch for partition session {} (partition {}). This shouldn't happen", new Object[]{this.path, this.id, this.partitionId});
            }
            batchMessages.forEach(messageData -> {
                long commitOffsetFrom = this.lastReadOffset;
                long messageOffset = messageData.getOffset();
                long newReadOffset = messageOffset + 1L;
                if (newReadOffset > this.lastReadOffset) {
                    this.lastReadOffset = newReadOffset;
                    if (logger.isTraceEnabled()) {
                        logger.trace("[{}] Received a message with offset {} for partition session {} (partition {}). lastReadOffset is now {}", new Object[]{this.path, messageOffset, this.id, this.partitionId, this.lastReadOffset});
                    }
                } else {
                    logger.error("[{}] Received a message with offset {} which is less than last read offset {} for partition session {} (partition {})", new Object[]{this.path, messageOffset, this.lastReadOffset, this.id, this.partitionId});
                }
                newBatch.addMessage(new MessageImpl.Builder().setBatchMeta(batchMeta).setPartitionSession(this).setData(messageData.getData().toByteArray()).setOffset(messageOffset).setSeqNo(messageData.getSeqNo()).setCommitOffsetFrom(commitOffsetFrom).setCreatedAt(ProtobufUtils.protoToInstant((Timestamp)messageData.getCreatedAt())).setMessageGroupId(messageData.getMessageGroupId()).setMetadataItems(messageData.getMetadataItemsList().stream().map(metadataItem -> new MetadataItem(metadataItem.getKey(), metadataItem.getValue().toByteArray())).collect(Collectors.toList())).build());
            });
            batchFutures.add(newBatch.getReadFuture());
            this.decodingBatchesLock.lock();
            try {
                this.decodingBatches.add(newBatch);
            }
            finally {
                this.decodingBatchesLock.unlock();
            }
            CompletableFuture.runAsync(() -> this.decode(newBatch), this.decompressionExecutor).thenRun(() -> {
                boolean haveNewBatchesReady = false;
                this.decodingBatchesLock.lock();
                try {
                    Batch decodingBatch;
                    while ((decodingBatch = this.decodingBatches.peek()) != null && (decodingBatch.isDecompressed() || decodingBatch.getCodec() == Codec.RAW)) {
                        this.decodingBatches.remove();
                        if (logger.isTraceEnabled()) {
                            List<MessageImpl> messages = decodingBatch.getMessages();
                            logger.trace("[{}] Adding batch with offsets {}-{} to reading queue of partition session {} (partition {})", new Object[]{this.path, messages.get(0).getOffset(), messages.get(messages.size() - 1).getOffset(), this.id, this.partitionId});
                        }
                        this.readingQueue.add(decodingBatch);
                        haveNewBatchesReady = true;
                    }
                }
                finally {
                    this.decodingBatchesLock.unlock();
                }
                if (haveNewBatchesReady) {
                    this.sendDataToReadersIfNeeded();
                }
            });
        });
        return CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0]));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
        CompletableFuture<Void> resultFuture;
        block5: {
            resultFuture = new CompletableFuture<Void>();
            this.commitFuturesLock.lock();
            try {
                if (this.isWorking.get()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} (partition {}). Last committed offset is {} (commit lag is {})", new Object[]{this.path, rangeToCommit.getStart(), rangeToCommit.getEnd(), this.id, this.partitionId, this.lastCommittedOffset, rangeToCommit.getStart() - this.lastCommittedOffset});
                    }
                    this.commitFutures.put(rangeToCommit.getEnd(), resultFuture);
                    break block5;
                }
                logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session {} (partition {}) is already closed", new Object[]{this.path, rangeToCommit.getStart(), rangeToCommit.getEnd(), this.id, this.partitionId});
                resultFuture.completeExceptionally(new RuntimeException("Partition session " + this.id + " (partition " + this.partitionId + ") for " + this.path + " is already closed"));
                CompletableFuture<Void> completableFuture = resultFuture;
                return completableFuture;
            }
            finally {
                this.commitFuturesLock.unlock();
            }
        }
        ArrayList<OffsetsRange> rangeWrapper = new ArrayList<OffsetsRange>(1);
        rangeWrapper.add(rangeToCommit);
        this.commitFunction.accept(rangeWrapper);
        return resultFuture;
    }

    public void commitOffsetRanges(List<OffsetsRange> rangesToCommit) {
        if (this.isWorking.get()) {
            if (logger.isInfoEnabled()) {
                StringBuilder message = new StringBuilder("[").append(this.path).append("] Sending CommitRequest for partition session ").append(this.id).append(" (partition ").append(this.partitionId).append(") with offset ranges ");
                PartitionSessionImpl.addRangesToString(message, rangesToCommit);
                logger.debug(message.toString());
            }
            this.commitFunction.accept(rangesToCommit);
        } else if (logger.isInfoEnabled()) {
            StringBuilder message = new StringBuilder("[").append(this.path).append("] Offset ranges ");
            PartitionSessionImpl.addRangesToString(message, rangesToCommit);
            message.append(" are requested to be committed, but partition session ").append(this.id).append(" (partition ").append(this.partitionId).append(") is already closed");
            logger.info(message.toString());
        }
    }

    private static void addRangesToString(StringBuilder stringBuilder, List<OffsetsRange> ranges) {
        for (int i = 0; i < ranges.size(); ++i) {
            if (i > 0) {
                stringBuilder.append(", ");
            }
            OffsetsRange range = ranges.get(i);
            stringBuilder.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")");
        }
    }

    public void handleCommitResponse(long committedOffset) {
        if (committedOffset <= this.lastCommittedOffset) {
            logger.error("[{}] Commit response received for partition session {} (partition {}). Committed offset: {} which is not greater than previous committed offset: {}.", new Object[]{this.path, this.id, this.partitionId, committedOffset, this.lastCommittedOffset});
            return;
        }
        NavigableMap<Long, CompletableFuture<Void>> futuresToComplete = this.commitFutures.headMap(committedOffset, true);
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}. Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", new Object[]{this.path, this.id, this.partitionId, committedOffset, this.lastCommittedOffset, committedOffset - this.lastCommittedOffset, futuresToComplete.size()});
        }
        this.lastCommittedOffset = committedOffset;
        futuresToComplete.values().forEach(future -> future.complete(null));
        futuresToComplete.clear();
    }

    private void decode(Batch batch) {
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Started decoding batch for partition session {} (partition {})", new Object[]{this.path, this.id, this.partitionId});
        }
        if (batch.getCodec() == Codec.RAW) {
            return;
        }
        batch.getMessages().forEach(message -> {
            try {
                message.setData(Encoder.decode(batch.getCodec(), message.getData()));
                message.setDecompressed(true);
            }
            catch (IOException exception) {
                message.setException(exception);
                logger.info("[{}] Exception was thrown while decoding a message in partition session {} (partition {})", new Object[]{this.path, this.id, this.partitionId});
            }
        });
        batch.setDecompressed(true);
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Finished decoding batch for partition session {} (partition {})", new Object[]{this.path, this.id, this.partitionId});
        }
    }

    private void sendDataToReadersIfNeeded() {
        if (!this.isWorking.get()) {
            return;
        }
        if (this.isReadingNow.compareAndSet(false, true)) {
            Batch batchToRead = this.readingQueue.poll();
            if (batchToRead == null) {
                this.isReadingNow.set(false);
                return;
            }
            List<MessageImpl> messageImplList = batchToRead.getMessages();
            ArrayList<Message> messagesToRead = new ArrayList<Message>(messageImplList);
            OffsetsRangeImpl offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(), messageImplList.get(messageImplList.size() - 1).getOffset() + 1L);
            DataReceivedEventImpl event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit);
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition session {} (partition {}) is about to be called...", new Object[]{this.path, messagesToRead.size(), ((Message)messagesToRead.get(0)).getOffset(), ((Message)messagesToRead.get(messagesToRead.size() - 1)).getOffset(), this.id, this.partitionId});
            }
            this.dataEventCallback.apply(event).whenComplete((res, th) -> {
                if (th != null) {
                    logger.error("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition session {} (partition {}) finished with error: ", new Object[]{this.path, messagesToRead.size(), ((Message)messagesToRead.get(0)).getOffset(), ((Message)messagesToRead.get(messagesToRead.size() - 1)).getOffset(), this.id, this.partitionId, th});
                } else if (logger.isDebugEnabled()) {
                    logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition session {} (partition {}) successfully finished", new Object[]{this.path, messagesToRead.size(), ((Message)messagesToRead.get(0)).getOffset(), ((Message)messagesToRead.get(messagesToRead.size() - 1)).getOffset(), this.id, this.partitionId});
                }
                this.isReadingNow.set(false);
                batchToRead.complete();
                this.sendDataToReadersIfNeeded();
            });
        } else if (logger.isTraceEnabled()) {
            logger.trace("[{}] Partition session {} (partition {}) - no need to send data to readers: reading is already being performed", new Object[]{this.path, this.id, this.partitionId});
        }
    }

    public void shutdown() {
        this.commitFuturesLock.lock();
        try {
            this.isWorking.set(false);
            logger.info("[{}] Partition session {} (partition {}) is shutting down. Failing {} commit futures...", new Object[]{this.path, this.id, this.partitionId, this.commitFutures.size()});
            this.commitFutures.values().forEach(f -> f.completeExceptionally(new RuntimeException("Partition session " + this.id + " (partition " + this.partitionId + ") for " + this.path + " is closed")));
        }
        finally {
            this.commitFuturesLock.unlock();
        }
        this.decodingBatchesLock.lock();
        try {
            this.decodingBatches.forEach(Batch::complete);
            this.readingQueue.forEach(Batch::complete);
        }
        finally {
            this.decodingBatchesLock.unlock();
        }
    }

    public static class Builder {
        private long id;
        private String path;
        private long partitionId;
        private long committedOffset;
        private OffsetsRange partitionOffsets;
        private Executor decompressionExecutor;
        private Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
        private Consumer<List<OffsetsRange>> commitFunction;

        public Builder setId(long id) {
            this.id = id;
            return this;
        }

        public Builder setPath(String path) {
            this.path = path;
            return this;
        }

        public Builder setPartitionId(long partitionId) {
            this.partitionId = partitionId;
            return this;
        }

        public Builder setCommittedOffset(long committedOffset) {
            this.committedOffset = committedOffset;
            return this;
        }

        public Builder setPartitionOffsets(OffsetsRange partitionOffsets) {
            this.partitionOffsets = partitionOffsets;
            return this;
        }

        public Builder setDecompressionExecutor(Executor decompressionExecutor) {
            this.decompressionExecutor = decompressionExecutor;
            return this;
        }

        public Builder setDataEventCallback(Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback) {
            this.dataEventCallback = dataEventCallback;
            return this;
        }

        public Builder setCommitFunction(Consumer<List<OffsetsRange>> commitFunction) {
            this.commitFunction = commitFunction;
            return this;
        }

        public PartitionSessionImpl build() {
            return new PartitionSessionImpl(this);
        }
    }
}

