package org.apache.storm.kafka.spout;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpout.class */
public class KafkaSpout<K, V> extends BaseRichSpout {
    private static final long serialVersionUID = 4151921085047987154L;
    public static final long TIMER_DELAY_MS = 500;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    protected SpoutOutputCollector collector;
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
    private transient KafkaConsumer<K, V> kafkaConsumer;
    private transient KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private transient KafkaSpoutRetryService retryService;
    private transient KafkaTupleListener tupleListener;
    private transient Timer commitTimer;
    private transient Map<TopicPartition, OffsetManager> offsetManagers;
    private transient Set<KafkaSpoutMessageId> emitted;
    private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
    private transient Timer refreshSubscriptionTimer;
    private transient TopologyContext context;
    private transient CommitMetadataManager commitMetadataManager;
    private transient KafkaOffsetMetric kafkaOffsetMetric;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpout$KafkaSpoutConsumerRebalanceListener.class */
    public class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
        private Collection<TopicPartition> previousAssignment;

        private KafkaSpoutConsumerRebalanceListener() {
            this.previousAssignment = new HashSet();
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.previousAssignment = collection;
            KafkaSpout.LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", new Object[]{KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.kafkaConsumer, collection});
            if (KafkaSpout.this.isAtLeastOnceProcessing()) {
                KafkaSpout.this.commitOffsetsForAckedTuples(new HashSet(collection));
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaSpout.LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", new Object[]{Integer.valueOf(KafkaSpout.this.context.getThisTaskId()), KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.kafkaConsumer, collection});
            initialize(collection);
            KafkaSpout.this.tupleListener.onPartitionsReassigned(collection);
        }

        private void initialize(Collection<TopicPartition> collection) {
            if (KafkaSpout.this.isAtLeastOnceProcessing()) {
                KafkaSpout.this.offsetManagers.keySet().retainAll(collection);
                KafkaSpout.this.retryService.retainAll(collection);
                Iterator it = KafkaSpout.this.emitted.iterator();
                while (it.hasNext()) {
                    if (!collection.contains(((KafkaSpoutMessageId) it.next()).getTopicPartition())) {
                        it.remove();
                    }
                }
            }
            KafkaSpout.this.waitingToEmit.keySet().retainAll(collection);
            HashSet<TopicPartition> hashSet = new HashSet(collection);
            hashSet.removeAll(this.previousAssignment);
            for (TopicPartition topicPartition : hashSet) {
                OffsetAndMetadata committed = KafkaSpout.this.kafkaConsumer.committed(topicPartition);
                long doSeek = doSeek(topicPartition, committed);
                KafkaSpout.LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]", new Object[]{Long.valueOf(doSeek), topicPartition, KafkaSpout.this.firstPollOffsetStrategy, committed});
                if (KafkaSpout.this.isAtLeastOnceProcessing() && !KafkaSpout.this.offsetManagers.containsKey(topicPartition)) {
                    KafkaSpout.this.offsetManagers.put(topicPartition, new OffsetManager(topicPartition, doSeek));
                }
            }
            KafkaSpout.LOG.info("Initialization complete");
        }

        private long doSeek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
            KafkaSpout.LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]", new Object[]{topicPartition, KafkaSpout.this.firstPollOffsetStrategy, offsetAndMetadata});
            if (offsetAndMetadata != null) {
                if (KafkaSpout.this.commitMetadataManager.isOffsetCommittedByThisTopology(topicPartition, offsetAndMetadata, Collections.unmodifiableMap(KafkaSpout.this.offsetManagers))) {
                    KafkaSpout.this.kafkaConsumer.seek(topicPartition, offsetAndMetadata.offset());
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)) {
                    KafkaSpout.this.kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
                } else {
                    KafkaSpout.this.kafkaConsumer.seek(topicPartition, offsetAndMetadata.offset());
                }
            } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) || KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)) {
                KafkaSpout.this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
            } else if (KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) || KafkaSpout.this.firstPollOffsetStrategy.equals(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)) {
                KafkaSpout.this.kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
            }
            return KafkaSpout.this.kafkaConsumer.position(topicPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpout$PollablePartitionsInfo.class */
    public static class PollablePartitionsInfo {
        private final Set<TopicPartition> pollablePartitions;
        private final Map<TopicPartition, Long> pollableEarliestRetriableOffsets = new HashMap();

        public PollablePartitionsInfo(Set<TopicPartition> set, Map<TopicPartition, Long> map) {
            this.pollablePartitions = set;
            for (TopicPartition topicPartition : map.keySet()) {
                if (this.pollablePartitions.contains(topicPartition)) {
                    this.pollableEarliestRetriableOffsets.put(topicPartition, map.get(topicPartition));
                }
            }
        }

        public boolean shouldPoll() {
            return !this.pollablePartitions.isEmpty();
        }
    }

    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
    }

    @VisibleForTesting
    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.collector = spoutOutputCollector;
        this.firstPollOffsetStrategy = this.kafkaSpoutConfig.getFirstPollOffsetStrategy();
        this.retryService = this.kafkaSpoutConfig.getRetryService();
        this.tupleListener = this.kafkaSpoutConfig.getTupleListener();
        if (this.kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
            this.commitTimer = new Timer(500L, this.kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
        this.refreshSubscriptionTimer = new Timer(500L, this.kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
        this.offsetManagers = new HashMap();
        this.emitted = new HashSet();
        this.waitingToEmit = new HashMap();
        this.commitMetadataManager = new CommitMetadataManager(topologyContext, this.kafkaSpoutConfig.getProcessingGuarantee());
        this.tupleListener.open(map, topologyContext);
        if (canRegisterMetrics()) {
            registerMetric();
        }
        LOG.info("Kafka Spout opened with the following configuration: {}", this.kafkaSpoutConfig);
    }

    private void registerMetric() {
        LOG.info("Registering Spout Metrics");
        this.kafkaOffsetMetric = new KafkaOffsetMetric(new Supplier() { // from class: org.apache.storm.kafka.spout.KafkaSpout.1
            public Object get() {
                return Collections.unmodifiableMap(KafkaSpout.this.offsetManagers);
            }
        }, new Supplier() { // from class: org.apache.storm.kafka.spout.KafkaSpout.2
            public Object get() {
                return KafkaSpout.this.kafkaConsumer;
            }
        });
        this.context.registerMetric("kafkaOffset", this.kafkaOffsetMetric, this.kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
    }

    private boolean canRegisterMetrics() {
        try {
            KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
            return true;
        } catch (NoSuchMethodException e) {
            LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAtLeastOnceProcessing() {
        return this.kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
    }

    public void nextTuple() {
        try {
            if (this.refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                this.kafkaSpoutConfig.getSubscription().refreshAssignment();
            }
            if (this.commitTimer != null && this.commitTimer.isExpiredResetOnTrue()) {
                if (isAtLeastOnceProcessing()) {
                    commitOffsetsForAckedTuples(this.kafkaConsumer.assignment());
                } else if (this.kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) {
                    Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata = createFetchedOffsetsMetadata(this.kafkaConsumer.assignment());
                    this.kafkaConsumer.commitAsync(createFetchedOffsetsMetadata, (OffsetCommitCallback) null);
                    LOG.debug("Committed offsets {} to Kafka", createFetchedOffsetsMetadata);
                }
            }
            PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
            if (pollablePartitionsInfo.shouldPoll()) {
                try {
                    setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
                } catch (RetriableException e) {
                    LOG.error("Failed to poll from kafka.", e);
                }
            }
            emitIfWaitingNotEmitted();
        } catch (InterruptException e2) {
            throwKafkaConsumerInterruptedException();
        }
    }

    private void throwKafkaConsumerInterruptedException() {
        throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
    }

    private PollablePartitionsInfo getPollablePartitionsInfo() {
        if (isWaitingToEmit()) {
            LOG.debug("Not polling. Tuples waiting to be emitted.");
            return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap());
        }
        Set<TopicPartition> assignment = this.kafkaConsumer.assignment();
        if (!isAtLeastOnceProcessing()) {
            return new PollablePartitionsInfo(assignment, Collections.emptyMap());
        }
        Map<TopicPartition, Long> earliestRetriableOffsets = this.retryService.earliestRetriableOffsets();
        HashSet hashSet = new HashSet();
        int maxUncommittedOffsets = this.kafkaSpoutConfig.getMaxUncommittedOffsets();
        for (TopicPartition topicPartition : assignment) {
            OffsetManager offsetManager = this.offsetManagers.get(topicPartition);
            int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
            if (numUncommittedOffsets < maxUncommittedOffsets) {
                hashSet.add(topicPartition);
            } else {
                long nthUncommittedOffsetAfterCommittedOffset = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
                Long l = earliestRetriableOffsets.get(topicPartition);
                if (l == null || l.longValue() > nthUncommittedOffsetAfterCommittedOffset) {
                    LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", new Object[]{topicPartition, Integer.valueOf(numUncommittedOffsets), Integer.valueOf(maxUncommittedOffsets)});
                } else {
                    hashSet.add(topicPartition);
                }
            }
        }
        return new PollablePartitionsInfo(hashSet, earliestRetriableOffsets);
    }

    private boolean isWaitingToEmit() {
        Iterator<List<ConsumerRecord<K, V>>> it = this.waitingToEmit.values().iterator();
        while (it.hasNext()) {
            if (!it.next().isEmpty()) {
                return true;
            }
        }
        return false;
    }

    private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            this.waitingToEmit.put(topicPartition, new ArrayList(consumerRecords.records(topicPartition)));
        }
    }

    private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
        doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
        HashSet hashSet = new HashSet(this.kafkaConsumer.assignment());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            if (pollablePartitionsInfo.pollablePartitions.contains(it.next())) {
                it.remove();
            }
        }
        try {
            this.kafkaConsumer.pause(hashSet);
            ConsumerRecords<K, V> poll = this.kafkaConsumer.poll(this.kafkaSpoutConfig.getPollTimeoutMs());
            ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, poll);
            LOG.debug("Polled [{}] records from Kafka", Integer.valueOf(poll.count()));
            if (this.kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata = createFetchedOffsetsMetadata(this.kafkaConsumer.assignment());
                this.kafkaConsumer.commitSync(createFetchedOffsetsMetadata);
                LOG.debug("Committed offsets {} to Kafka", createFetchedOffsetsMetadata);
            }
            return poll;
        } finally {
            this.kafkaConsumer.resume(hashSet);
        }
    }

    private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> map) {
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            this.kafkaConsumer.seek(entry.getKey(), entry.getValue().longValue());
        }
    }

    private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> map, ConsumerRecords<K, V> consumerRecords) {
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            List records = consumerRecords.records(key);
            if (!records.isEmpty()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) records.get(0);
                long longValue = entry.getValue().longValue();
                long offset = consumerRecord.offset();
                if (longValue < offset) {
                    long j = longValue;
                    while (true) {
                        long j2 = j;
                        if (j2 < offset) {
                            KafkaSpoutMessageId messageId = this.retryService.getMessageId(new ConsumerRecord<>(key.topic(), key.partition(), j2, (Object) null, (Object) null));
                            if (!this.offsetManagers.get(key).contains(messageId) && !this.emitted.contains(messageId)) {
                                LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", Long.valueOf(j2), key);
                                this.retryService.remove(messageId);
                                this.emitted.add(messageId);
                                ack(messageId);
                            }
                            j = j2 + 1;
                        }
                    }
                }
            }
        }
    }

    private void emitIfWaitingNotEmitted() {
        Iterator<List<ConsumerRecord<K, V>>> it = this.waitingToEmit.values().iterator();
        while (it.hasNext()) {
            List<ConsumerRecord<K, V>> next = it.next();
            while (!next.isEmpty()) {
                if (emitOrRetryTuple(next.remove(0))) {
                    return;
                }
            }
            it.remove();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean emitOrRetryTuple(ConsumerRecord<K, V> consumerRecord) {
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        KafkaSpoutMessageId messageId = this.retryService.getMessageId(consumerRecord);
        if (this.offsetManagers.containsKey(topicPartition) && this.offsetManagers.get(topicPartition).contains(messageId)) {
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", consumerRecord);
            return false;
        }
        if (this.emitted.contains(messageId)) {
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", consumerRecord);
            return false;
        }
        OffsetAndMetadata committed = this.kafkaConsumer.committed(topicPartition);
        if (isAtLeastOnceProcessing() && committed != null && committed.offset() > consumerRecord.offset() && this.commitMetadataManager.isOffsetCommittedByThisTopology(topicPartition, committed, Collections.unmodifiableMap(this.offsetManagers))) {
            throw new IllegalStateException("Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.");
        }
        List<Object> apply = this.kafkaSpoutConfig.getTranslator().apply((ConsumerRecord) consumerRecord);
        if (!isEmitTuple(apply)) {
            LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", consumerRecord);
            if (!isAtLeastOnceProcessing()) {
                return false;
            }
            messageId.setNullTuple(true);
            this.offsetManagers.get(topicPartition).addToEmitMsgs(messageId.offset());
            ack(messageId);
            return false;
        }
        boolean isScheduled = this.retryService.isScheduled(messageId);
        if (isScheduled && !this.retryService.isReady(messageId)) {
            return false;
        }
        String stream = apply instanceof KafkaTuple ? ((KafkaTuple) apply).getStream() : "default";
        if (!isAtLeastOnceProcessing()) {
            if (this.kafkaSpoutConfig.isTupleTrackingEnforced()) {
                this.collector.emit(stream, apply, messageId);
                LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", new Object[]{apply, consumerRecord, messageId});
                return true;
            }
            this.collector.emit(stream, apply);
            LOG.trace("Emitted tuple [{}] for record [{}]", apply, consumerRecord);
            return true;
        }
        this.emitted.add(messageId);
        this.offsetManagers.get(topicPartition).addToEmitMsgs(messageId.offset());
        if (isScheduled) {
            this.retryService.remove(messageId);
        }
        this.collector.emit(stream, apply, messageId);
        this.tupleListener.onEmit(apply, messageId);
        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", new Object[]{apply, consumerRecord, messageId});
        return true;
    }

    private boolean isEmitTuple(List<Object> list) {
        return list != null || this.kafkaSpoutConfig.isEmitNullTuples();
    }

    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : set) {
            hashMap.put(topicPartition, new OffsetAndMetadata(this.kafkaConsumer.position(topicPartition), this.commitMetadataManager.getCommitMetadata()));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOffsetsForAckedTuples(Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetManager> entry : this.offsetManagers.entrySet()) {
            if (set.contains(entry.getKey())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<K, V> entry2 : hashMap.entrySet()) {
            OffsetAndMetadata findNextCommitOffset = ((OffsetManager) entry2.getValue()).findNextCommitOffset(this.commitMetadataManager.getCommitMetadata());
            if (findNextCommitOffset != null) {
                hashMap2.put(entry2.getKey(), findNextCommitOffset);
            }
        }
        if (hashMap2.isEmpty()) {
            LOG.trace("No offsets to commit. {}", this);
            return;
        }
        this.kafkaConsumer.commitSync(hashMap2);
        LOG.debug("Offsets successfully committed to Kafka [{}]", hashMap2);
        for (Map.Entry<K, V> entry3 : hashMap2.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry3.getKey();
            long position = this.kafkaConsumer.position(topicPartition);
            long offset = ((OffsetAndMetadata) entry3.getValue()).offset();
            if (position < offset) {
                LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", Long.valueOf(position), Long.valueOf(offset));
                this.kafkaConsumer.seek(topicPartition, offset);
                List<ConsumerRecord<K, V>> list = this.waitingToEmit.get(topicPartition);
                if (list != null) {
                    ArrayList arrayList = new ArrayList();
                    for (ConsumerRecord<K, V> consumerRecord : list) {
                        if (consumerRecord.offset() >= offset) {
                            arrayList.add(consumerRecord);
                        }
                    }
                    this.waitingToEmit.put(topicPartition, arrayList);
                }
            }
            OffsetManager offsetManager = (OffsetManager) hashMap.get(topicPartition);
            offsetManager.commit((OffsetAndMetadata) entry3.getValue());
            LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", Integer.valueOf(offsetManager.getNumUncommittedOffsets()), topicPartition);
        }
    }

    public void ack(Object obj) {
        if (isAtLeastOnceProcessing()) {
            KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) obj;
            if (kafkaSpoutMessageId.isNullTuple()) {
                this.offsetManagers.get(kafkaSpoutMessageId.getTopicPartition()).addToAckMsgs(kafkaSpoutMessageId);
                LOG.debug("Received direct ack for message [{}], associated with null tuple", kafkaSpoutMessageId);
                this.tupleListener.onAck(kafkaSpoutMessageId);
            } else {
                if (this.emitted.contains(kafkaSpoutMessageId)) {
                    Validate.isTrue(!this.retryService.isScheduled(kafkaSpoutMessageId), "The message id " + kafkaSpoutMessageId + " is queued for retry while being acked. This should never occur barring errors in the RetryService implementation or the spout code.");
                    this.offsetManagers.get(kafkaSpoutMessageId.getTopicPartition()).addToAckMsgs(kafkaSpoutMessageId);
                    this.emitted.remove(kafkaSpoutMessageId);
                } else {
                    LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that came from a topic-partition that this consumer group instance is no longer tracking due to rebalance/partition reassignment. No action taken.", kafkaSpoutMessageId);
                }
                this.tupleListener.onAck(kafkaSpoutMessageId);
            }
        }
    }

    public void fail(Object obj) {
        if (isAtLeastOnceProcessing()) {
            KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) obj;
            if (!this.emitted.contains(kafkaSpoutMessageId)) {
                LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", kafkaSpoutMessageId);
                return;
            }
            Validate.isTrue(!this.retryService.isScheduled(kafkaSpoutMessageId), "The message id " + kafkaSpoutMessageId + " is queued for retry while being failed. This should never occur barring errors in the RetryService implementation or the spout code.");
            kafkaSpoutMessageId.incrementNumFails();
            if (this.retryService.schedule(kafkaSpoutMessageId)) {
                this.tupleListener.onRetry(kafkaSpoutMessageId);
                this.emitted.remove(kafkaSpoutMessageId);
            } else {
                LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", kafkaSpoutMessageId);
                this.tupleListener.onMaxRetryReached(kafkaSpoutMessageId);
                ack(kafkaSpoutMessageId);
            }
        }
    }

    public void activate() {
        try {
            subscribeKafkaConsumer();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }

    private void subscribeKafkaConsumer() {
        this.kafkaConsumer = this.kafkaConsumerFactory.createConsumer(this.kafkaSpoutConfig);
        this.kafkaSpoutConfig.getSubscription().subscribe(this.kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), this.context);
    }

    public void deactivate() {
        try {
            shutdown();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }

    public void close() {
        try {
            shutdown();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }

    private void shutdown() {
        try {
            if (isAtLeastOnceProcessing()) {
                commitOffsetsForAckedTuples(this.kafkaConsumer.assignment());
            }
        } finally {
            this.kafkaConsumer.close();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        RecordTranslator<K, V> translator = this.kafkaSpoutConfig.getTranslator();
        for (String str : translator.streams()) {
            outputFieldsDeclarer.declareStream(str, translator.getFieldsFor(str));
        }
    }

    public String toString() {
        return "KafkaSpout{offsetManagers =" + this.offsetManagers + ", emitted=" + this.emitted + "}";
    }

    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = super.getComponentConfiguration();
        if (componentConfiguration == null) {
            componentConfiguration = new HashMap();
        }
        componentConfiguration.put("config.topics", getTopicsString());
        componentConfiguration.put("config.groupid", this.kafkaSpoutConfig.getConsumerGroupId());
        componentConfiguration.put("config.bootstrap.servers", this.kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
        componentConfiguration.put("config.security.protocol", this.kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
        return componentConfiguration;
    }

    private String getTopicsString() {
        return this.kafkaSpoutConfig.getSubscription().getTopicsString();
    }

    @VisibleForTesting
    KafkaOffsetMetric getKafkaOffsetMetric() {
        return this.kafkaOffsetMetric;
    }
}
