/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.client.impl.UnackMessageIdWrapper;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnAckedMessageRedeliveryTracker
extends UnAckedMessageTracker {
    private static final Logger log = LoggerFactory.getLogger(UnAckedMessageRedeliveryTracker.class);
    protected final HashMap<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>> redeliveryMessageIdPartitionMap;
    protected final ArrayDeque<HashSet<UnackMessageIdWrapper>> redeliveryTimePartitions;
    protected final HashMap<MessageId, Long> ackTimeoutMessages;
    private final RedeliveryBackoff ackTimeoutRedeliveryBackoff;

    public UnAckedMessageRedeliveryTracker(final PulsarClientImpl client, final ConsumerBase<?> consumerBase, ConsumerConfigurationData<?> conf) {
        super(client, consumerBase, conf);
        this.ackTimeoutRedeliveryBackoff = conf.getAckTimeoutRedeliveryBackoff();
        this.ackTimeoutMessages = new HashMap();
        this.redeliveryMessageIdPartitionMap = new HashMap();
        this.redeliveryTimePartitions = new ArrayDeque();
        int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / (double)this.tickDurationInMs);
        for (int i = 0; i < blankPartitions + 1; ++i) {
            this.redeliveryTimePartitions.add(new HashSet(16, 1.0f));
        }
        this.timeout = client.timer().newTimeout(new TimerTask(){

            @Override
            public void run(Timeout t) throws Exception {
                UnAckedMessageRedeliveryTracker.this.writeLock.lock();
                try {
                    HashSet<UnackMessageIdWrapper> headPartition = UnAckedMessageRedeliveryTracker.this.redeliveryTimePartitions.removeFirst();
                    if (!headPartition.isEmpty()) {
                        headPartition.forEach(unackMessageIdWrapper -> {
                            UnAckedMessageRedeliveryTracker.this.addAckTimeoutMessages(unackMessageIdWrapper);
                            UnAckedMessageRedeliveryTracker.this.redeliveryMessageIdPartitionMap.remove(unackMessageIdWrapper);
                            unackMessageIdWrapper.recycle();
                        });
                    }
                    headPartition.clear();
                    UnAckedMessageRedeliveryTracker.this.redeliveryTimePartitions.addLast(headPartition);
                    UnAckedMessageRedeliveryTracker.this.triggerRedelivery(consumerBase);
                }
                finally {
                    UnAckedMessageRedeliveryTracker.this.writeLock.unlock();
                    UnAckedMessageRedeliveryTracker.this.timeout = client.timer().newTimeout(this, UnAckedMessageRedeliveryTracker.this.tickDurationInMs, TimeUnit.MILLISECONDS);
                }
            }
        }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addAckTimeoutMessages(UnackMessageIdWrapper messageIdWrapper) {
        this.writeLock.lock();
        try {
            MessageId messageId = messageIdWrapper.getMessageId();
            int redeliveryCount = messageIdWrapper.getRedeliveryCount();
            long backoffNs = this.ackTimeoutRedeliveryBackoff.next(redeliveryCount);
            this.ackTimeoutMessages.put(messageId, System.currentTimeMillis() + backoffNs);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerRedelivery(ConsumerBase<?> consumerBase) {
        if (this.ackTimeoutMessages.isEmpty()) {
            return;
        }
        Set messageIds = (Set)TL_MESSAGE_IDS_SET.get();
        messageIds.clear();
        try {
            long now = System.currentTimeMillis();
            this.ackTimeoutMessages.forEach((messageId, timestamp) -> {
                if (timestamp <= now) {
                    UnAckedMessageRedeliveryTracker.addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
                    messageIds.add(messageId);
                }
            });
            if (!messageIds.isEmpty()) {
                log.info("[{}] {} messages will be re-delivered", consumerBase, (Object)messageIds.size());
                for (MessageId messageId2 : messageIds) {
                    this.ackTimeoutMessages.remove(messageId2);
                }
            }
        }
        finally {
            if (messageIds.size() > 0) {
                consumerBase.onAckTimeoutSend(messageIds);
                consumerBase.redeliverUnacknowledgedMessages(messageIds);
            }
        }
    }

    @Override
    boolean isEmpty() {
        this.readLock.lock();
        try {
            boolean bl = this.redeliveryMessageIdPartitionMap.isEmpty() && this.ackTimeoutMessages.isEmpty();
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public void clear() {
        this.writeLock.lock();
        try {
            this.redeliveryMessageIdPartitionMap.clear();
            this.redeliveryTimePartitions.forEach((Consumer<HashSet<UnackMessageIdWrapper>>)((Consumer<HashSet>)tp -> {
                tp.forEach(unackMessageIdWrapper -> unackMessageIdWrapper.recycle());
                tp.clear();
            }));
            this.ackTimeoutMessages.clear();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public boolean add(MessageId messageId) {
        return this.add(messageId, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean add(MessageId messageId, int redeliveryCount) {
        this.writeLock.lock();
        try {
            UnackMessageIdWrapper messageIdWrapper = UnackMessageIdWrapper.valueOf(messageId, redeliveryCount);
            HashSet<UnackMessageIdWrapper> partition = this.redeliveryTimePartitions.peekLast();
            HashSet<UnackMessageIdWrapper> previousPartition = this.redeliveryMessageIdPartitionMap.putIfAbsent(messageIdWrapper, partition);
            if (previousPartition == null) {
                boolean bl = partition.add(messageIdWrapper);
                return bl;
            }
            messageIdWrapper.recycle();
            boolean bl = false;
            return bl;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean remove(MessageId messageId) {
        this.writeLock.lock();
        UnackMessageIdWrapper messageIdWrapper = UnackMessageIdWrapper.valueOf(messageId);
        try {
            boolean removed = false;
            HashSet<UnackMessageIdWrapper> exist = this.redeliveryMessageIdPartitionMap.remove(messageIdWrapper);
            if (exist != null) {
                removed = exist.remove(messageIdWrapper);
            }
            boolean bl = removed || this.ackTimeoutMessages.remove(messageId) != null;
            return bl;
        }
        finally {
            messageIdWrapper.recycle();
            this.writeLock.unlock();
        }
    }

    @Override
    long size() {
        this.readLock.lock();
        try {
            long l = this.redeliveryMessageIdPartitionMap.size() + this.ackTimeoutMessages.size();
            return l;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int removeMessagesTill(MessageId msgId) {
        this.writeLock.lock();
        try {
            int removed = 0;
            Iterator<Map.Entry<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>>> iterator = this.redeliveryMessageIdPartitionMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>> entry = iterator.next();
                UnackMessageIdWrapper messageIdWrapper = entry.getKey();
                if (messageIdWrapper.getMessageId().compareTo((Object)msgId) > 0) continue;
                entry.getValue().remove(messageIdWrapper);
                iterator.remove();
                messageIdWrapper.recycle();
                ++removed;
            }
            Iterator<MessageId> iteratorAckTimeOut = this.ackTimeoutMessages.keySet().iterator();
            while (iteratorAckTimeOut.hasNext()) {
                MessageId messageId = iteratorAckTimeOut.next();
                if (messageId.compareTo((Object)msgId) > 0) continue;
                iteratorAckTimeOut.remove();
                ++removed;
            }
            int n = removed;
            return n;
        }
        finally {
            this.writeLock.unlock();
        }
    }
}

