package org.apache.storm.kinesis.spout;

import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.class */
public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
    private final Long initialDelayMillis;
    private final Long baseSeconds;
    private final Long maxRetries;
    private Map<KinesisMessageId, Long> failCounts;
    private Map<KinesisMessageId, Long> retryTimes;
    private SortedSet<KinesisMessageId> retryMessageSet;

    /* loaded from: input_file:org/apache/storm/kinesis/spout/ExponentialBackoffRetrier$RetryTimeComparator.class */
    private class RetryTimeComparator implements Serializable, Comparator<KinesisMessageId> {
        private RetryTimeComparator() {
        }

        @Override // java.util.Comparator
        public int compare(KinesisMessageId kinesisMessageId, KinesisMessageId kinesisMessageId2) {
            return ((Long) ExponentialBackoffRetrier.this.retryTimes.get(kinesisMessageId)).compareTo((Long) ExponentialBackoffRetrier.this.retryTimes.get(kinesisMessageId2));
        }
    }

    public ExponentialBackoffRetrier() {
        this(100L, 2L, Long.MAX_VALUE);
    }

    public ExponentialBackoffRetrier(Long l, Long l2, Long l3) {
        this.failCounts = new HashMap();
        this.retryTimes = new HashMap();
        this.retryMessageSet = new TreeSet(new RetryTimeComparator());
        this.initialDelayMillis = l;
        this.baseSeconds = l2;
        this.maxRetries = l3;
        validate();
    }

    private void validate() {
        if (this.initialDelayMillis.longValue() < 0) {
            throw new IllegalArgumentException("initialDelayMillis cannot be negative.");
        }
        if (this.baseSeconds.longValue() < 0) {
            throw new IllegalArgumentException("baseSeconds cannot be negative.");
        }
        if (this.maxRetries.longValue() < 0) {
            throw new IllegalArgumentException("maxRetries cannot be negative.");
        }
    }

    @Override // org.apache.storm.kinesis.spout.FailedMessageRetryHandler
    public boolean failed(KinesisMessageId kinesisMessageId) {
        LOG.debug("Handling failed message {}", kinesisMessageId);
        if (this.maxRetries.longValue() == 0) {
            LOG.warn("maxRetries set to 0. Hence not queueing " + kinesisMessageId);
            return false;
        }
        if (!this.failCounts.containsKey(kinesisMessageId)) {
            this.failCounts.put(kinesisMessageId, 0L);
        }
        Long l = this.failCounts.get(kinesisMessageId);
        Map<KinesisMessageId, Long> map = this.failCounts;
        Long valueOf = Long.valueOf(l.longValue() + 1);
        map.put(kinesisMessageId, valueOf);
        if (valueOf.longValue() > this.maxRetries.longValue()) {
            LOG.warn("maxRetries reached so dropping " + kinesisMessageId);
            this.failCounts.remove(kinesisMessageId);
            return false;
        }
        this.retryTimes.put(kinesisMessageId, getRetryTime(valueOf));
        this.retryMessageSet.add(kinesisMessageId);
        LOG.debug("Scheduled {} for retry at {} and retry attempt {}", new Object[]{kinesisMessageId, this.retryTimes.get(kinesisMessageId), valueOf});
        return true;
    }

    @Override // org.apache.storm.kinesis.spout.FailedMessageRetryHandler
    public void acked(KinesisMessageId kinesisMessageId) {
        LOG.debug("Ack received for {}. Hence cleaning state.", kinesisMessageId);
        this.failCounts.remove(kinesisMessageId);
    }

    @Override // org.apache.storm.kinesis.spout.FailedMessageRetryHandler
    public KinesisMessageId getNextFailedMessageToRetry() {
        KinesisMessageId kinesisMessageId = null;
        if (!this.retryMessageSet.isEmpty()) {
            kinesisMessageId = this.retryMessageSet.first();
            if (this.retryTimes.get(kinesisMessageId).longValue() > System.nanoTime()) {
                kinesisMessageId = null;
            }
        }
        LOG.debug("Returning {} to spout for retrying.", kinesisMessageId);
        return kinesisMessageId;
    }

    @Override // org.apache.storm.kinesis.spout.FailedMessageRetryHandler
    public void failedMessageEmitted(KinesisMessageId kinesisMessageId) {
        LOG.debug("Spout says {} emitted. Hence removing it from queue and wait for its ack or fail", kinesisMessageId);
        this.retryMessageSet.remove(kinesisMessageId);
        this.retryTimes.remove(kinesisMessageId);
    }

    private Long getRetryTime(Long l) {
        Long valueOf;
        Long valueOf2 = Long.valueOf(System.nanoTime());
        Long l2 = 1000000L;
        if (l.longValue() == 1) {
            valueOf = Long.valueOf(valueOf2.longValue() + (this.initialDelayMillis.longValue() * l2.longValue()));
        } else {
            Long l3 = Long.MAX_VALUE;
            double pow = Math.pow(this.baseSeconds.longValue(), l.longValue() - 1) * 1000.0d * l2.longValue();
            valueOf = (pow >= l3.doubleValue() || valueOf2.longValue() + ((long) pow) < valueOf2.longValue()) ? l3 : Long.valueOf(valueOf2.longValue() + ((long) pow));
        }
        return valueOf;
    }
}
