/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.BatchMessageAckerDisabled;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentAcknowledgmentsGroupingTracker
implements AcknowledgmentsGroupingTracker {
    private static final Logger log = LoggerFactory.getLogger(PersistentAcknowledgmentsGroupingTracker.class);
    private static final int MAX_ACK_GROUP_SIZE = 1000;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl)MessageId.earliest;
    private volatile BitSetRecyclable lastCumulativeAckSet = null;
    private volatile boolean cumulativeAckFlushRequired = false;
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck");
    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, BitSetRecyclable> LAST_CUMULATIVE_ACK_SET_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PersistentAcknowledgmentsGroupingTracker.class, BitSetRecyclable.class, "lastCumulativeAckSet");
    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
    private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
    private final ConcurrentHashMap<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>> pendingIndividualTransactionBatchIndexAcks;
    private final ConcurrentSkipListSet<Triple<Long, Long, MessageIdImpl>> pendingIndividualTransactionAcks;
    private final ScheduledFuture<?> scheduledTask;

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf, EventLoopGroup eventLoopGroup) {
        this.consumer = consumer;
        this.pendingIndividualAcks = new ConcurrentSkipListSet();
        this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap();
        this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
        this.pendingIndividualTransactionBatchIndexAcks = new ConcurrentHashMap();
        this.pendingIndividualTransactionAcks = new ConcurrentSkipListSet();
        this.scheduledTask = this.acknowledgementGroupTimeMicros > 0L ? eventLoopGroup.next().scheduleWithFixedDelay(this::flush, this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS) : null;
    }

    @Override
    public boolean isDuplicate(MessageId messageId) {
        if (messageId.compareTo((Object)this.lastCumulativeAck) <= 0) {
            return true;
        }
        return this.pendingIndividualAcks.contains(messageId);
    }

    @Override
    public void addListAcknowledgment(List<MessageIdImpl> messageIds, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties) {
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            messageIds.forEach(messageId -> this.doCumulativeAck((MessageIdImpl)messageId, null));
            return;
        }
        messageIds.forEach(messageId -> {
            if (messageId instanceof BatchMessageIdImpl) {
                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)messageId;
                this.pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
            } else {
                this.pendingIndividualAcks.add((MessageIdImpl)messageId);
            }
            this.pendingIndividualBatchIndexAcks.remove(messageId);
            if (this.pendingIndividualAcks.size() >= 1000) {
                this.flush();
            }
        });
        if (this.acknowledgementGroupTimeMicros == 0L) {
            this.flush();
        }
    }

    @Override
    public void addAcknowledgment(MessageIdImpl msgId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        if (this.acknowledgementGroupTimeMicros == 0L || !properties.isEmpty() || txn != null && ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            if (msgId instanceof BatchMessageIdImpl && txn != null) {
                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)msgId;
                this.doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchIndex(), ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
                return;
            }
            this.doImmediateAck(msgId, ackType, properties, txn);
        } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            this.doCumulativeAck(msgId, null);
        } else {
            if (msgId instanceof BatchMessageIdImpl) {
                this.pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()));
            } else if (txn != null) {
                this.pendingIndividualTransactionAcks.add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
            } else {
                this.pendingIndividualAcks.add(msgId);
            }
            this.pendingIndividualBatchIndexAcks.remove(msgId);
            if (this.pendingIndividualAcks.size() >= 1000) {
                this.flush();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        if (this.acknowledgementGroupTimeMicros == 0L || !properties.isEmpty()) {
            this.doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties, txn == null ? -1L : txn.getTxnIdMostBits(), txn == null ? -1L : txn.getTxnIdLeastBits());
        } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            BitSetRecyclable bitSet = BitSetRecyclable.create();
            bitSet.set(0, batchSize);
            bitSet.clear(0, batchIndex + 1);
            this.doCumulativeAck(msgId, bitSet);
        } else if (ackType == PulsarApi.CommandAck.AckType.Individual) {
            if (txn != null) {
                TransactionImpl transactionImpl = txn;
                synchronized (transactionImpl) {
                    ConcurrentHashMap transactionIndividualBatchIndexAcks = this.pendingIndividualTransactionBatchIndexAcks.computeIfAbsent(txn, v -> new ConcurrentHashMap());
                    ConcurrentBitSetRecyclable bitSet = transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, v -> {
                        ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create();
                        value.set(0, msgId.getAcker().getBatchSize());
                        return value;
                    });
                    bitSet.clear(batchIndex);
                }
            } else {
                ConcurrentBitSetRecyclable bitSet = this.pendingIndividualBatchIndexAcks.computeIfAbsent(new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), v -> {
                    ConcurrentBitSetRecyclable value;
                    if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
                        value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
                    } else {
                        value = ConcurrentBitSetRecyclable.create();
                        value.set(0, batchSize);
                    }
                    return value;
                });
                bitSet.clear(batchIndex);
            }
            if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
                this.flush();
            }
        }
    }

    private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) {
        block4: {
            BitSetRecyclable lastBitSet;
            MessageIdImpl lastCumlativeAck;
            do {
                lastCumlativeAck = this.lastCumulativeAck;
                lastBitSet = this.lastCumulativeAckSet;
                if (msgId.compareTo(lastCumlativeAck) <= 0) break block4;
            } while (!LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) || !LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet));
            if (lastBitSet != null) {
                try {
                    lastBitSet.recycle();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.cumulativeAckFlushRequired = true;
            return;
        }
    }

    private void doTransactionCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) {
        block4: {
            BitSetRecyclable lastBitSet;
            MessageIdImpl lastCumlativeAck;
            do {
                lastCumlativeAck = this.lastCumulativeAck;
                lastBitSet = this.lastCumulativeAckSet;
                if (msgId.compareTo(lastCumlativeAck) <= 0) break block4;
            } while (!LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) || !LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet));
            if (lastBitSet != null) {
                try {
                    lastBitSet.recycle();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.cumulativeAckFlushRequired = true;
            return;
        }
    }

    private boolean doImmediateAck(MessageIdImpl msgId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl transaction) {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return false;
        }
        if (transaction != null) {
            this.newAckCommand(this.consumer.consumerId, msgId, null, ackType, null, properties, cnx, true, transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits());
        } else {
            this.newAckCommand(this.consumer.consumerId, msgId, null, ackType, null, properties, cnx, true, -1L, -1L);
        }
        return true;
    }

    private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, PulsarApi.CommandAck.AckType ackType, Map<String, Long> properties, long txnidMostBits, long txnidLeastBits) {
        BitSetRecyclable bitSet;
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return false;
        }
        if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
            bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
        } else {
            bitSet = BitSetRecyclable.create();
            bitSet.set(0, batchSize);
        }
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
            bitSet.clear(0, batchIndex + 1);
        } else {
            bitSet.clear(batchIndex);
        }
        ByteBuf cmd = Commands.newAck(this.consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties, txnidLeastBits, txnidMostBits, -1L);
        bitSet.recycle();
        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
            }
            return;
        }
        boolean shouldFlush = false;
        if (this.cumulativeAckFlushRequired) {
            this.newAckCommand(this.consumer.consumerId, this.lastCumulativeAck, this.lastCumulativeAckSet, PulsarApi.CommandAck.AckType.Cumulative, null, Collections.emptyMap(), cnx, false, -1L, -1L);
            shouldFlush = true;
            this.cumulativeAckFlushRequired = false;
        }
        ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(this.pendingIndividualAcks.size() + this.pendingIndividualBatchIndexAcks.size());
        HashMap transactionEntriesToAck = new HashMap();
        if (!this.pendingIndividualAcks.isEmpty()) {
            MessageIdImpl msgId;
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    MessageIdImpl[] messageIdImplArray = (MessageIdImpl[])this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
                    if (messageIdImplArray != null && messageIdImplArray.length > 1) {
                        for (MessageIdImpl cMsgId : messageIdImplArray) {
                            if (cMsgId == null) continue;
                            entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                        }
                        this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
                        continue;
                    }
                    entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
                }
            } else {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    this.newAckCommand(this.consumer.consumerId, msgId, null, PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap(), cnx, false, -1L, -1L);
                    shouldFlush = true;
                }
            }
        }
        if (!this.pendingIndividualBatchIndexAcks.isEmpty()) {
            Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> iterator = this.pendingIndividualBatchIndexAcks.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry = iterator.next();
                entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue()));
                iterator.remove();
            }
        }
        if (!this.pendingIndividualTransactionAcks.isEmpty()) {
            Triple<Long, Long, MessageIdImpl> entry;
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                while ((entry = this.pendingIndividualTransactionAcks.pollFirst()) != null) {
                    MessageIdImpl[] messageIdImplArray = (MessageIdImpl[])this.consumer.unAckedChunckedMessageIdSequenceMap.get(entry.getRight());
                    long mostSigBits = entry.getLeft();
                    long leastSigBits = entry.getMiddle();
                    MessageIdImpl messageId = entry.getRight();
                    if (messageIdImplArray != null && messageIdImplArray.length > 1) {
                        for (MessageIdImpl cMsgId : messageIdImplArray) {
                            if (cMsgId == null) continue;
                            this.newAckCommand(this.consumer.consumerId, cMsgId, null, PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits);
                        }
                        this.consumer.unAckedChunckedMessageIdSequenceMap.remove(messageId);
                        continue;
                    }
                    this.newAckCommand(this.consumer.consumerId, messageId, null, PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits);
                }
            } else {
                while ((entry = this.pendingIndividualTransactionAcks.pollFirst()) != null) {
                    this.newAckCommand(this.consumer.consumerId, entry.getRight(), null, PulsarApi.CommandAck.AckType.Individual, null, Collections.emptyMap(), cnx, false, entry.getLeft(), entry.getMiddle());
                    shouldFlush = true;
                }
            }
        }
        if (!this.pendingIndividualTransactionBatchIndexAcks.isEmpty()) {
            Iterator<Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>> transactionIterator = this.pendingIndividualTransactionBatchIndexAcks.entrySet().iterator();
            while (transactionIterator.hasNext()) {
                TransactionImpl txn;
                Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>> entry = transactionIterator.next();
                TransactionImpl transactionImpl = txn = entry.getKey();
                synchronized (transactionImpl) {
                    if (this.pendingIndividualTransactionBatchIndexAcks.containsKey(txn)) {
                        ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> messageIdBitSetList = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>();
                        transactionEntriesToAck.put(txn, messageIdBitSetList);
                        Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> messageIdIterator = entry.getValue().entrySet().iterator();
                        while (messageIdIterator.hasNext()) {
                            Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> messageIdEntry = messageIdIterator.next();
                            ConcurrentBitSetRecyclable concurrentBitSetRecyclable = ConcurrentBitSetRecyclable.create(messageIdEntry.getValue());
                            MessageIdImpl messageId = messageIdEntry.getKey();
                            messageIdBitSetList.add(Triple.of(messageId.ledgerId, messageId.entryId, concurrentBitSetRecyclable));
                            messageIdEntry.getValue().set(0, messageIdEntry.getValue().size());
                            messageIdIterator.remove();
                        }
                        transactionIterator.remove();
                    }
                }
            }
        }
        if (transactionEntriesToAck.size() > 0) {
            for (Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry : transactionEntriesToAck.entrySet()) {
                cnx.ctx().write(Commands.newMultiTransactionMessageAck(this.consumer.consumerId, new TxnID(((TransactionImpl)((Object)entry.getKey())).getTxnIdMostBits(), ((TransactionImpl)((Object)entry.getKey())).getTxnIdLeastBits()), (List)((Object)entry.getValue())), cnx.ctx().voidPromise());
                shouldFlush = true;
            }
        }
        if (entriesToAck.size() > 0) {
            cnx.ctx().write(Commands.newMultiMessageAck(this.consumer.consumerId, entriesToAck), cnx.ctx().voidPromise());
            shouldFlush = true;
        }
        if (shouldFlush) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}", new Object[]{this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks, this.pendingIndividualBatchIndexAcks});
            }
            cnx.ctx().flush();
        }
    }

    @Override
    public void flushAndClean() {
        this.flush();
        this.lastCumulativeAck = (MessageIdImpl)MessageId.earliest;
        this.pendingIndividualAcks.clear();
    }

    @Override
    public void close() {
        this.flush();
        if (this.scheduledTask != null && !this.scheduledTask.isCancelled()) {
            this.scheduledTask.cancel(true);
        }
    }

    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet, PulsarApi.CommandAck.AckType ackType, PulsarApi.CommandAck.ValidationError validationError, Map<String, Long> map, ClientCnx cnx, boolean flush, long txnidMostBits, long txnidLeastBits) {
        MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
        if (chunkMsgIds != null && txnidLeastBits < 0L && txnidMostBits < 0L) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()) && ackType != PulsarApi.CommandAck.AckType.Cumulative) {
                ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(chunkMsgIds.length);
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    if (cMsgId == null || chunkMsgIds.length <= 1) continue;
                    entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                }
                ByteBuf cmd = Commands.newMultiMessageAck(this.consumer.consumerId, entriesToAck);
                if (flush) {
                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                } else {
                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
                }
            } else {
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    ByteBuf cmd = Commands.newAck(consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(), lastCumulativeAckSet, ackType, validationError, map);
                    if (flush) {
                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                        continue;
                    }
                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
                }
            }
            this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
        } else {
            ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), lastCumulativeAckSet, ackType, validationError, map, txnidLeastBits, txnidMostBits, -1L);
            if (flush) {
                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
            } else {
                cnx.ctx().write(cmd, cnx.ctx().voidPromise());
            }
        }
    }
}

