/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.v3_0_8.client.impl;

import java.io.Closeable;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.v3_0_8.client.impl.ConsumerBase;
import org.apache.pulsar.v3_0_8.client.impl.MessageIdAdvUtils;
import org.apache.pulsar.v3_0_8.client.impl.MessageIdImpl;
import org.apache.pulsar.v3_0_8.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.v3_0_8.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.v3_0_8.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.v3_0_8.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.v3_0_8.shade.io.netty.util.Timeout;
import org.apache.pulsar.v3_0_8.shade.io.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);
    private ConcurrentLongLongPairHashMap nackedMessages = null;
    private final ConsumerBase<?> consumer;
    private final Timer timer;
    private final long nackDelayNanos;
    private final long timerIntervalNanos;
    private final RedeliveryBackoff negativeAckRedeliveryBackoff;
    private Timeout timeout;
    private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
    private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;

    public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
        this.consumer = consumer;
        this.timer = consumer.getClient().timer();
        this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()), MIN_NACK_DELAY_NANOS);
        this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff();
        this.timerIntervalNanos = this.negativeAckRedeliveryBackoff != null ? Math.max(TimeUnit.MILLISECONDS.toNanos(this.negativeAckRedeliveryBackoff.next(0)), MIN_NACK_DELAY_NANOS) / 3L : this.nackDelayNanos / 3L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerRedelivery(Timeout t2) {
        HashSet<MessageId> messagesToRedeliver = new HashSet<MessageId>();
        NegativeAcksTracker negativeAcksTracker = this;
        synchronized (negativeAcksTracker) {
            if (this.nackedMessages.isEmpty()) {
                this.timeout = null;
                return;
            }
            long now = System.nanoTime();
            this.nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
                if (timestamp < now) {
                    MessageIdImpl msgId = new MessageIdImpl(ledgerId, entryId, (int)(partitionIndex == Long.MAX_VALUE ? -1L : partitionIndex));
                    UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
                    messagesToRedeliver.add(msgId);
                }
            });
            for (MessageId messageId : messagesToRedeliver) {
                this.nackedMessages.remove(((MessageIdImpl)messageId).getLedgerId(), ((MessageIdImpl)messageId).getEntryId());
            }
            this.timeout = this.timer.newTimeout(this::triggerRedelivery, this.timerIntervalNanos, TimeUnit.NANOSECONDS);
        }
        if (!messagesToRedeliver.isEmpty()) {
            this.consumer.onNegativeAcksSend(messagesToRedeliver);
            log.info("[{}] {} messages will be re-delivered", this.consumer, (Object)messagesToRedeliver.size());
            this.consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
        }
    }

    public synchronized void add(MessageId messageId) {
        this.add(messageId, 0);
    }

    public synchronized void add(Message<?> message) {
        this.add(message.getMessageId(), message.getRedeliveryCount());
    }

    private synchronized void add(MessageId messageId, int redeliveryCount) {
        if (this.nackedMessages == null) {
            this.nackedMessages = ConcurrentLongLongPairHashMap.newBuilder().autoShrink(true).concurrencyLevel(1).build();
        }
        long backoffNs = this.negativeAckRedeliveryBackoff != null ? TimeUnit.MILLISECONDS.toNanos(this.negativeAckRedeliveryBackoff.next(redeliveryCount)) : this.nackDelayNanos;
        MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
        this.nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), messageIdAdv.getPartitionIndex() >= 0 ? (long)messageIdAdv.getPartitionIndex() : Long.MAX_VALUE, System.nanoTime() + backoffNs);
        if (this.timeout == null) {
            this.timeout = this.timer.newTimeout(this::triggerRedelivery, this.timerIntervalNanos, TimeUnit.NANOSECONDS);
        }
    }

    @VisibleForTesting
    Optional<Long> getNackedMessagesCount() {
        return Optional.ofNullable(this.nackedMessages).map(ConcurrentLongLongPairHashMap::size);
    }

    @Override
    public synchronized void close() {
        if (this.timeout != null && !this.timeout.isCancelled()) {
            this.timeout.cancel();
            this.timeout = null;
        }
        if (this.nackedMessages != null) {
            this.nackedMessages.clear();
            this.nackedMessages = null;
        }
    }
}

