package org.apache.storm.kafka.spout.trident;

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.class */
public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaTridentSpoutEmitter.class);
    private final Consumer<K, V> consumer;
    private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
    private final TopicAssigner topicAssigner;
    private final Map<TopicPartition, Long> tpToFirstSeekOffset;
    private final long pollTimeoutMs;
    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final TopicPartitionSerializer tpSerializer;
    private final TopologyContext topologyContext;
    private final long startTimeStamp;

    /* loaded from: input_file:org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter$KafkaSpoutConsumerRebalanceListener.class */
    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
        private KafkaSpoutConsumerRebalanceListener() {
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaTridentSpoutEmitter.LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]", KafkaTridentSpoutEmitter.this.consumer, collection);
        }

        @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaTridentSpoutEmitter.LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]", KafkaTridentSpoutEmitter.this.consumer, collection);
        }
    }

    public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaTridentSpoutConfig, TopologyContext topologyContext) {
        this(kafkaTridentSpoutConfig, topologyContext, new ConsumerFactoryDefault(), new TopicAssigner());
    }

    @VisibleForTesting
    KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaTridentSpoutConfig, TopologyContext topologyContext, ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
        this.tpToFirstSeekOffset = new HashMap();
        this.tpSerializer = new TopicPartitionSerializer();
        this.kafkaSpoutConfig = kafkaTridentSpoutConfig;
        this.consumer = consumerFactory.createConsumer(kafkaTridentSpoutConfig.getKafkaProps());
        this.topologyContext = topologyContext;
        this.translator = kafkaTridentSpoutConfig.getTranslator();
        this.topicAssigner = topicAssigner;
        this.pollTimeoutMs = kafkaTridentSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaTridentSpoutConfig.getFirstPollOffsetStrategy();
        this.startTimeStamp = kafkaTridentSpoutConfig.getStartTimeStamp();
        LOG.debug("Created {}", toString());
    }

    public void reEmitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, KafkaTridentSpoutTopicPartition kafkaTridentSpoutTopicPartition, Map<String, Object> map) {
        TopicPartition topicPartition = kafkaTridentSpoutTopicPartition.getTopicPartition();
        throwIfEmittingForUnassignedPartition(topicPartition);
        KafkaTridentSpoutBatchMetadata fromMap = KafkaTridentSpoutBatchMetadata.fromMap(map);
        Set emptySet = Collections.emptySet();
        if (!this.topologyContext.getStormId().equals(fromMap.getTopologyId()) && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
            LOG.debug("Skipping re-emit of batch that was originally emitted by another topology, because the current first poll offset strategy ignores committed offsets.");
            return;
        }
        LOG.debug("Re-emitting batch: [transaction= {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", transactionAttempt, kafkaTridentSpoutTopicPartition, map, tridentCollector);
        try {
            Collection<TopicPartition> pauseTopicPartitions = pauseTopicPartitions(topicPartition);
            long firstOffset = fromMap.getFirstOffset();
            if (firstOffset < 0 && fromMap.getFirstOffset() == fromMap.getLastOffset()) {
                LOG.debug("Skipping re-emit of batch with negative starting offset. The spout may set a negative starting offset for an empty batch that occurs at the start of a partition. It is not expected that Trident will replay such an empty batch, but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context.");
                this.consumer.resume(pauseTopicPartitions);
                LOG.trace("Resumed topic-partitions {}", pauseTopicPartitions);
                return;
            }
            LOG.debug("Seeking to offset [{}] for topic partition [{}]", Long.valueOf(firstOffset), topicPartition);
            this.consumer.seek(topicPartition, firstOffset);
            ConsumerRecords<K, V> poll = this.consumer.poll(this.pollTimeoutMs);
            LOG.debug("Polled [{}] records from Kafka.", Integer.valueOf(poll.count()));
            Iterator<ConsumerRecord<K, V>> it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord<K, V> next = it.next();
                if (next.offset() == fromMap.getLastOffset() + 1) {
                    break;
                } else {
                    if (next.offset() > fromMap.getLastOffset()) {
                        throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch. The batch end offset was [{%d}], but received [{%d}]. Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", Long.valueOf(fromMap.getLastOffset()), Long.valueOf(next.offset())));
                    }
                    emitTuple(tridentCollector, next);
                }
            }
            this.consumer.resume(pauseTopicPartitions);
            LOG.trace("Resumed topic-partitions {}", pauseTopicPartitions);
            LOG.debug("Re-emitted batch: [transaction = {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", transactionAttempt, kafkaTridentSpoutTopicPartition, fromMap, tridentCollector);
        } catch (Throwable th) {
            this.consumer.resume(emptySet);
            LOG.trace("Resumed topic-partitions {}", emptySet);
            throw th;
        }
    }

    public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, KafkaTridentSpoutTopicPartition kafkaTridentSpoutTopicPartition, Map<String, Object> map) {
        KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata;
        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", transactionAttempt, kafkaTridentSpoutTopicPartition, map, tridentCollector);
        TopicPartition topicPartition = kafkaTridentSpoutTopicPartition.getTopicPartition();
        throwIfEmittingForUnassignedPartition(topicPartition);
        KafkaTridentSpoutBatchMetadata fromMap = map == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(map);
        Set emptySet = Collections.emptySet();
        try {
            Collection<TopicPartition> pauseTopicPartitions = pauseTopicPartitions(topicPartition);
            seek(topicPartition, fromMap);
            List<ConsumerRecord<K, V>> records = this.consumer.poll(this.pollTimeoutMs).records(topicPartition);
            LOG.debug("Polled [{}] records from Kafka.", Integer.valueOf(records.size()));
            if (records.isEmpty()) {
                long position = this.consumer.position(topicPartition) - 1;
                kafkaTridentSpoutBatchMetadata = new KafkaTridentSpoutBatchMetadata(position, position, this.topologyContext.getStormId());
            } else {
                Iterator<ConsumerRecord<K, V>> it = records.iterator();
                while (it.hasNext()) {
                    emitTuple(tridentCollector, it.next());
                }
                kafkaTridentSpoutBatchMetadata = new KafkaTridentSpoutBatchMetadata(records.get(0).offset(), records.get(records.size() - 1).offset(), this.topologyContext.getStormId());
            }
            this.consumer.resume(pauseTopicPartitions);
            LOG.trace("Resumed topic-partitions {}", pauseTopicPartitions);
            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [currBatchMetadata = {}], [collector = {}]", transactionAttempt, kafkaTridentSpoutTopicPartition, map, kafkaTridentSpoutBatchMetadata, tridentCollector);
            return kafkaTridentSpoutBatchMetadata.toMap();
        } catch (Throwable th) {
            this.consumer.resume(emptySet);
            LOG.trace("Resumed topic-partitions {}", emptySet);
            throw th;
        }
    }

    private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
        return this.firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST || this.firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST;
    }

    private void throwIfEmittingForUnassignedPartition(TopicPartition topicPartition) {
        Set<TopicPartition> assignment = this.consumer.assignment();
        if (!assignment.contains(topicPartition)) {
            throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned. This indicates a bug in the TopicFilter or ManualPartitioner implementations. The current partition is [" + topicPartition + "], the assigned partitions are [" + assignment + "].");
        }
    }

    private void emitTuple(TridentCollector tridentCollector, ConsumerRecord<K, V> consumerRecord) {
        List<Object> apply = this.translator.apply((ConsumerRecord) consumerRecord);
        tridentCollector.emit(apply);
        LOG.debug("Emitted tuple {} for record [{}]", apply, consumerRecord);
    }

    private long seek(TopicPartition topicPartition, KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata) {
        if (isFirstPollSinceExecutorStarted(topicPartition)) {
            boolean z = kafkaTridentSpoutBatchMetadata == null || !this.topologyContext.getStormId().equals(kafkaTridentSpoutBatchMetadata.getTopologyId());
            if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST && z) {
                LOG.debug("First poll for topic partition [{}], seeking to partition beginning", topicPartition);
                this.consumer.seekToBeginning(Collections.singleton(topicPartition));
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST && z) {
                LOG.debug("First poll for topic partition [{}], seeking to partition end", topicPartition);
                this.consumer.seekToEnd(Collections.singleton(topicPartition));
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.TIMESTAMP && z) {
                LOG.debug("First poll for topic partition [{}], seeking to partition based on startTimeStamp", topicPartition);
                seekOffsetByStartTimeStamp(topicPartition);
            } else if (kafkaTridentSpoutBatchMetadata != null) {
                LOG.debug("First poll for topic partition [{}], using last batch metadata", topicPartition);
                this.consumer.seek(topicPartition, kafkaTridentSpoutBatchMetadata.getLastOffset() + 1);
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", topicPartition);
                this.consumer.seekToBeginning(Collections.singleton(topicPartition));
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.UNCOMMITTED_LATEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", topicPartition);
                this.consumer.seekToEnd(Collections.singleton(topicPartition));
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.UNCOMMITTED_TIMESTAMP) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition based on startTimeStamp", topicPartition);
                seekOffsetByStartTimeStamp(topicPartition);
            }
            this.tpToFirstSeekOffset.put(topicPartition, Long.valueOf(this.consumer.position(topicPartition)));
        } else if (kafkaTridentSpoutBatchMetadata != null) {
            this.consumer.seek(topicPartition, kafkaTridentSpoutBatchMetadata.getLastOffset() + 1);
            LOG.debug("First poll for topic partition [{}], using last batch metadata", topicPartition);
        } else {
            long longValue = this.tpToFirstSeekOffset.get(topicPartition).longValue();
            this.consumer.seek(topicPartition, longValue);
            LOG.debug("First poll for topic partition [{}], no last batch metadata present. Using stored initial fetch offset [{}]", topicPartition, Long.valueOf(longValue));
        }
        long position = this.consumer.position(topicPartition);
        LOG.debug("Set [fetchOffset = {}] for partition [{}]", Long.valueOf(position), topicPartition);
        return position;
    }

    private void seekOffsetByStartTimeStamp(TopicPartition topicPartition) {
        long offset = this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(this.startTimeStamp))).get(topicPartition).offset();
        LOG.debug("First poll for topic partition [{}], seeking to partition from startTimeStamp [{}]", topicPartition, Long.valueOf(this.startTimeStamp));
        this.consumer.seek(topicPartition, offset);
    }

    private boolean isFirstPollSinceExecutorStarted(TopicPartition topicPartition) {
        return !this.tpToFirstSeekOffset.containsKey(topicPartition);
    }

    private Collection<TopicPartition> pauseTopicPartitions(TopicPartition topicPartition) {
        HashSet hashSet = new HashSet(this.consumer.assignment());
        LOG.debug("Currently assigned topic-partitions {}", hashSet);
        hashSet.remove(topicPartition);
        this.consumer.pause(hashSet);
        LOG.debug("Paused topic-partitions {}", hashSet);
        return hashSet;
    }

    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<String, Object>> list) {
        List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions = newKafkaTridentSpoutTopicPartitions((List) list.stream().map(map -> {
            return this.tpSerializer.fromMap(map);
        }).sorted(TopicPartitionComparator.INSTANCE).collect(Collectors.toList()));
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", newKafkaTridentSpoutTopicPartitions, Integer.valueOf(this.topologyContext.getThisTaskIndex()), Integer.valueOf(getNumTasks()));
        return newKafkaTridentSpoutTopicPartitions;
    }

    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int i, int i2, List<KafkaTridentSpoutTopicPartition> list) {
        Set<TopicPartition> partitionsForThisTask = this.kafkaSpoutConfig.getTopicPartitioner().getPartitionsForThisTask((List) list.stream().map(kafkaTridentSpoutTopicPartition -> {
            return kafkaTridentSpoutTopicPartition.getTopicPartition();
        }).collect(Collectors.toList()), this.topologyContext);
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", this.consumer, Integer.valueOf(i), partitionsForThisTask);
        return newKafkaTridentSpoutTopicPartitions(partitionsForThisTask);
    }

    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> list) {
        Set<TopicPartition> set = (Set) list.stream().map(kafkaTridentSpoutTopicPartition -> {
            return kafkaTridentSpoutTopicPartition.getTopicPartition();
        }).collect(Collectors.toSet());
        this.topicAssigner.assignPartitions(this.consumer, set, new KafkaSpoutConsumerRebalanceListener());
        LOG.debug("Assigned partitions [{}] to this task", set);
    }

    private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        for (TopicPartition topicPartition : collection) {
            LOG.trace("Added topic-partition [{}]", topicPartition);
            arrayList.add(new KafkaTridentSpoutTopicPartition(topicPartition));
        }
        return arrayList;
    }

    private int getNumTasks() {
        return this.topologyContext.getComponentTasks(this.topologyContext.getThisComponentId()).size();
    }

    public void close() {
        this.consumer.close();
        LOG.debug("Closed");
    }

    public final String toString() {
        return super.toString() + "{kafkaSpoutConfig=" + this.kafkaSpoutConfig + '}';
    }
}
