/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.topic.read.impl;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.topic.read.DeferredCommitter;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.DisjointOffsetRangeSet;
import tech.ydb.topic.read.impl.MessageImpl;
import tech.ydb.topic.read.impl.PartitionSessionImpl;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;

public class DeferredCommitterImpl
implements DeferredCommitter {
    private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);
    private final Map<PartitionSessionImpl, PartitionRanges> rangesByPartition = new ConcurrentHashMap<PartitionSessionImpl, PartitionRanges>();

    @Override
    public void add(Message message) {
        MessageImpl messageImpl = (MessageImpl)message;
        PartitionRanges partitionRanges = this.rangesByPartition.computeIfAbsent(messageImpl.getPartitionSessionImpl(), x$0 -> new PartitionRanges((PartitionSessionImpl)x$0));
        partitionRanges.add(messageImpl.getOffsetsToCommit());
    }

    @Override
    public void add(DataReceivedEvent event) {
        DataReceivedEventImpl eventImpl = (DataReceivedEventImpl)event;
        PartitionRanges partitionRanges = this.rangesByPartition.computeIfAbsent(eventImpl.getPartitionSessionImpl(), x$0 -> new PartitionRanges((PartitionSessionImpl)x$0));
        partitionRanges.add(eventImpl.getOffsetsToCommit());
    }

    @Override
    public void commit() {
        this.rangesByPartition.forEach((session, partitionRanges) -> ((PartitionRanges)partitionRanges).commit());
    }

    private static class PartitionRanges {
        private final PartitionSessionImpl partitionSession;
        private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();

        private PartitionRanges(PartitionSessionImpl partitionSession) {
            this.partitionSession = partitionSession;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void add(OffsetsRange offsetRange) {
            try {
                DisjointOffsetRangeSet disjointOffsetRangeSet = this.ranges;
                synchronized (disjointOffsetRangeSet) {
                    this.ranges.add(offsetRange);
                }
            }
            catch (RuntimeException exception) {
                String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + this.partitionSession.getId() + " (partition " + this.partitionSession.getPartitionId() + "): " + exception.getMessage();
                logger.error(errorMessage);
                throw new RuntimeException(errorMessage, exception);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void commit() {
            List<OffsetsRange> rangesToCommit;
            DisjointOffsetRangeSet disjointOffsetRangeSet = this.ranges;
            synchronized (disjointOffsetRangeSet) {
                rangesToCommit = this.ranges.getRangesAndClear();
            }
            this.partitionSession.commitOffsetRanges(rangesToCommit);
        }
    }
}

