/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class NegativeAcksTracker {
    private HashMap<MessageId, Long> nackedMessages = null;
    private final ConsumerBase<?> consumer;
    private final Timer timer;
    private final long nackDelayNanos;
    private final long timerIntervalNanos;
    private Timeout timeout;
    private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);

    public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
        this.consumer = consumer;
        this.timer = consumer.getClient().timer();
        this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()), MIN_NACK_DELAY_NANOS);
        this.timerIntervalNanos = this.nackDelayNanos / 3L;
    }

    private synchronized void triggerRedelivery(Timeout t) {
        if (this.nackedMessages.isEmpty()) {
            this.timeout = null;
            return;
        }
        HashSet<MessageId> messagesToRedeliver = new HashSet<MessageId>();
        long now = System.nanoTime();
        this.nackedMessages.forEach((msgId, timestamp) -> {
            if (timestamp < now) {
                UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
                messagesToRedeliver.add((MessageId)msgId);
            }
        });
        messagesToRedeliver.forEach(this.nackedMessages::remove);
        this.consumer.onNegativeAcksSend(messagesToRedeliver);
        this.consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
        this.timeout = this.timer.newTimeout(this::triggerRedelivery, this.timerIntervalNanos, TimeUnit.NANOSECONDS);
    }

    public synchronized void add(MessageId messageId) {
        if (messageId instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)messageId;
            messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
        }
        if (this.nackedMessages == null) {
            this.nackedMessages = new HashMap();
        }
        this.nackedMessages.put(messageId, System.nanoTime() + this.nackDelayNanos);
        if (this.timeout == null) {
            this.timeout = this.timer.newTimeout(this::triggerRedelivery, this.timerIntervalNanos, TimeUnit.NANOSECONDS);
        }
    }
}

