/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Status;
import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.TransactionMessageAccumulator;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.DeferredCommitterImpl;
import tech.ydb.topic.read.impl.DisjointOffsetRangeSet;
import tech.ydb.topic.read.impl.MessageImpl;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;

public class TransactionMessageAccumulatorImpl
implements TransactionMessageAccumulator {
    private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);
    private final AsyncReader reader;
    private final Map<String, Map<PartitionSession, PartitionRanges>> rangesByTopic = new ConcurrentHashMap<String, Map<PartitionSession, PartitionRanges>>();

    TransactionMessageAccumulatorImpl(AsyncReader reader) {
        this.reader = reader;
    }

    @Override
    public void add(Message message) {
        MessageImpl messageImpl = (MessageImpl)message;
        PartitionRanges partitionRanges = this.rangesByTopic.computeIfAbsent(message.getPartitionSession().getPath(), path -> new ConcurrentHashMap()).computeIfAbsent(message.getPartitionSession(), x$0 -> new PartitionRanges((PartitionSession)x$0));
        partitionRanges.add(messageImpl.getOffsetsToCommit());
    }

    @Override
    public void add(DataReceivedEvent event) {
        DataReceivedEventImpl eventImpl = (DataReceivedEventImpl)event;
        PartitionRanges partitionRanges = this.rangesByTopic.computeIfAbsent(event.getPartitionSession().getPath(), path -> new ConcurrentHashMap()).computeIfAbsent(event.getPartitionSession(), x$0 -> new PartitionRanges((PartitionSession)x$0));
        partitionRanges.add(eventImpl.getOffsetsToCommit());
    }

    @Override
    public CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction transaction, UpdateOffsetsInTransactionSettings settings) {
        HashMap<String, List<PartitionOffsets>> offsets = new HashMap<String, List<PartitionOffsets>>();
        this.rangesByTopic.forEach((path, topicRanges) -> offsets.put((String)path, topicRanges.entrySet().stream().map(partitionRange -> new PartitionOffsets((PartitionSession)partitionRange.getKey(), ((PartitionRanges)partitionRange.getValue()).getOffsetsRanges())).collect(Collectors.toList())));
        return this.reader.updateOffsetsInTransaction(transaction, offsets, settings);
    }

    private static class PartitionRanges {
        private final PartitionSession partitionSession;
        private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();

        private PartitionRanges(PartitionSession partitionSession) {
            this.partitionSession = partitionSession;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void add(OffsetsRange offsetRange) {
            try {
                DisjointOffsetRangeSet disjointOffsetRangeSet = this.ranges;
                synchronized (disjointOffsetRangeSet) {
                    this.ranges.add(offsetRange);
                }
            }
            catch (RuntimeException exception) {
                String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + this.partitionSession.getId() + " (partition " + this.partitionSession.getPartitionId() + "): " + exception.getMessage();
                logger.error(errorMessage);
                throw new RuntimeException(errorMessage, exception);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<OffsetsRange> getOffsetsRanges() {
            DisjointOffsetRangeSet disjointOffsetRangeSet = this.ranges;
            synchronized (disjointOffsetRangeSet) {
                return this.ranges.getRangesAndClear();
            }
        }
    }
}

