package org.apache.storm.kafka.spout.internal;

import com.google.common.annotations.VisibleForTesting;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/internal/OffsetManager.class */
public class OffsetManager {
    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
    private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class);
    private final TopicPartition tp;
    private final NavigableSet<Long> emittedOffsets = new TreeSet();
    private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet(OFFSET_COMPARATOR);
    private long committedOffset;
    private boolean committed;
    private long latestEmittedOffset;

    /* loaded from: input_file:org/apache/storm/kafka/spout/internal/OffsetManager$OffsetComparator.class */
    private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
        private OffsetComparator() {
        }

        @Override // java.util.Comparator
        public int compare(KafkaSpoutMessageId kafkaSpoutMessageId, KafkaSpoutMessageId kafkaSpoutMessageId2) {
            if (kafkaSpoutMessageId.offset() < kafkaSpoutMessageId2.offset()) {
                return -1;
            }
            return kafkaSpoutMessageId.offset() == kafkaSpoutMessageId2.offset() ? 0 : 1;
        }
    }

    public OffsetManager(TopicPartition topicPartition, long j) {
        this.tp = topicPartition;
        this.committedOffset = j;
        LOG.debug("Instantiated {}", toString());
    }

    public void addToAckMsgs(KafkaSpoutMessageId kafkaSpoutMessageId) {
        this.ackedMsgs.add(kafkaSpoutMessageId);
    }

    public void addToEmitMsgs(long j) {
        this.emittedOffsets.add(Long.valueOf(j));
        this.latestEmittedOffset = Math.max(this.latestEmittedOffset, j);
    }

    public int getNumUncommittedOffsets() {
        return this.emittedOffsets.size();
    }

    public long getNthUncommittedOffsetAfterCommittedOffset(int i) {
        Iterator<Long> it = this.emittedOffsets.iterator();
        for (int i2 = 0; i2 < i - 1; i2++) {
            it.next();
        }
        return it.next().longValue();
    }

    public OffsetAndMetadata findNextCommitOffset(String str) {
        long offset;
        boolean z = false;
        long j = this.committedOffset;
        Iterator<KafkaSpoutMessageId> it = this.ackedMsgs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            offset = it.next().offset();
            if (offset == j) {
                z = true;
                j = offset + 1;
            } else {
                if (offset <= j) {
                    TopicPartition topicPartition = this.tp;
                    IllegalStateException illegalStateException = new IllegalStateException("The offset [" + offset + "] is below the current nextCommitOffset [" + illegalStateException + "] for [" + j + "]. This should not be possible, and likely indicates a bug in the spout's acking or emit logic.");
                    throw illegalStateException;
                }
                if (this.emittedOffsets.contains(Long.valueOf(j))) {
                    LOG.debug("topic-partition [{}] has non-sequential offset [{}]. It will be processed in a subsequent batch.", this.tp, Long.valueOf(offset));
                    break;
                }
                LOG.debug("Processed non-sequential offset. The earliest uncommitted offset is no longer part of the topic. Missing offset: [{}], Processed: [{}]", Long.valueOf(j), Long.valueOf(offset));
                Long ceiling = this.emittedOffsets.ceiling(Long.valueOf(j));
                if (ceiling == null || offset != ceiling.longValue()) {
                    break;
                }
                LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", Long.valueOf(offset), Long.valueOf(j));
                z = true;
                j = offset + 1;
            }
        }
        LOG.debug("Topic-partition [{}] has non-sequential offset [{}]. Next offset to commit should be [{}]", new Object[]{this.tp, Long.valueOf(offset), Long.valueOf(j)});
        OffsetAndMetadata offsetAndMetadata = null;
        if (z) {
            offsetAndMetadata = new OffsetAndMetadata(j, str);
            LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed. Processing will resume at offset [{}] upon spout restart", new Object[]{this.tp, Long.valueOf(this.committedOffset), Long.valueOf(offsetAndMetadata.offset() - 1), Long.valueOf(offsetAndMetadata.offset())});
        } else {
            LOG.debug("Topic-partition [{}] has no offsets ready to be committed", this.tp);
        }
        LOG.trace("{}", this);
        return offsetAndMetadata;
    }

    public long commit(OffsetAndMetadata offsetAndMetadata) {
        this.committed = true;
        long j = this.committedOffset;
        long j2 = 0;
        this.committedOffset = offsetAndMetadata.offset();
        Iterator<KafkaSpoutMessageId> it = this.ackedMsgs.iterator();
        while (it.hasNext() && it.next().offset() < offsetAndMetadata.offset()) {
            it.remove();
            j2++;
        }
        Iterator<Long> it2 = this.emittedOffsets.iterator();
        while (it2.hasNext() && it2.next().longValue() < offsetAndMetadata.offset()) {
            it2.remove();
        }
        LOG.trace("{}", this);
        LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]. Processing will resume at [{}] upon spout restart", new Object[]{Long.valueOf(j2), Long.valueOf(j), Long.valueOf(this.committedOffset - 1), this.tp, Long.valueOf(this.committedOffset)});
        return j2;
    }

    public boolean hasCommitted() {
        return this.committed;
    }

    public boolean contains(KafkaSpoutMessageId kafkaSpoutMessageId) {
        return this.ackedMsgs.contains(kafkaSpoutMessageId);
    }

    @VisibleForTesting
    boolean containsEmitted(long j) {
        return this.emittedOffsets.contains(Long.valueOf(j));
    }

    public long getLatestEmittedOffset() {
        return this.latestEmittedOffset;
    }

    public long getCommittedOffset() {
        return this.committedOffset;
    }

    public final String toString() {
        TopicPartition topicPartition = this.tp;
        long j = this.committedOffset;
        NavigableSet<Long> navigableSet = this.emittedOffsets;
        NavigableSet<KafkaSpoutMessageId> navigableSet2 = this.ackedMsgs;
        long j2 = this.latestEmittedOffset;
        return "OffsetManager{topic-partition=" + topicPartition + ", committedOffset=" + j + ", emittedOffsets=" + topicPartition + ", ackedMsgs=" + navigableSet + ", latestEmittedOffset=" + navigableSet2 + "}";
    }
}
