package org.apache.storm.kafka.spout.trident;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.class */
public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>>, Serializable {
    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaTridentSpoutEmitter.class);
    private final KafkaConsumer<K, V> kafkaConsumer;
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
    private final Map<TopicPartition, Long> firstPollTransaction;
    private final long pollTimeoutMs;
    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final Timer refreshSubscriptionTimer;
    private final TopicPartitionSerializer tpSerializer;
    private TopologyContext topologyContext;

    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaTridentSpoutManager, TopologyContext topologyContext, Timer timer) {
        this.firstPollTransaction = new HashMap();
        this.tpSerializer = new TopicPartitionSerializer();
        this.kafkaConsumer = kafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(topologyContext);
        this.kafkaManager = kafkaTridentSpoutManager;
        this.topologyContext = topologyContext;
        this.refreshSubscriptionTimer = timer;
        this.translator = kafkaTridentSpoutManager.getKafkaSpoutConfig().getTranslator();
        KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaTridentSpoutManager.getKafkaSpoutConfig();
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", toString());
    }

    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaTridentSpoutManager, TopologyContext topologyContext) {
        this(kafkaTridentSpoutManager, topologyContext, new Timer(500L, kafkaTridentSpoutManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
    }

    public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, KafkaTridentSpoutTopicPartition kafkaTridentSpoutTopicPartition, Map<String, Object> map) {
        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", transactionAttempt, kafkaTridentSpoutTopicPartition, map, tridentCollector);
        TopicPartition topicPartition = kafkaTridentSpoutTopicPartition.getTopicPartition();
        Set<TopicPartition> assignment = this.kafkaConsumer.assignment();
        KafkaTridentSpoutBatchMetadata fromMap = map == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(map);
        KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata = fromMap;
        Collection<TopicPartition> emptySet = Collections.emptySet();
        if (assignment == null || !assignment.contains(kafkaTridentSpoutTopicPartition.getTopicPartition())) {
            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}] because it is not part of the assignments {} of consumer instance [{}] of consumer group [{}]", transactionAttempt, kafkaTridentSpoutTopicPartition, map, tridentCollector, assignment, this.kafkaConsumer, this.kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
        } else {
            try {
                emptySet = pauseTopicPartitions(topicPartition);
                seek(topicPartition, fromMap, transactionAttempt.getTransactionId().longValue());
                if (this.refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                    this.kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
                }
                ConsumerRecords<K, V> poll = this.kafkaConsumer.poll(this.pollTimeoutMs);
                LOG.debug("Polled [{}] records from Kafka.", Integer.valueOf(poll.count()));
                if (!poll.isEmpty()) {
                    emitTuples(tridentCollector, poll);
                    kafkaTridentSpoutBatchMetadata = new KafkaTridentSpoutBatchMetadata(topicPartition, poll);
                }
                this.kafkaConsumer.resume(emptySet);
                LOG.trace("Resumed topic-partitions {}", emptySet);
                LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [currBatchMetadata = {}], [collector = {}]", transactionAttempt, kafkaTridentSpoutTopicPartition, map, kafkaTridentSpoutBatchMetadata, tridentCollector);
            } catch (Throwable th) {
                this.kafkaConsumer.resume(emptySet);
                LOG.trace("Resumed topic-partitions {}", emptySet);
                throw th;
            }
        }
        if (kafkaTridentSpoutBatchMetadata == null) {
            return null;
        }
        return kafkaTridentSpoutBatchMetadata.toMap();
    }

    private void emitTuples(TridentCollector tridentCollector, ConsumerRecords<K, V> consumerRecords) {
        Iterator<ConsumerRecord<K, V>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> next = it.next();
            List<Object> apply = this.translator.apply((ConsumerRecord) next);
            tridentCollector.emit(apply);
            LOG.debug("Emitted tuple {} for record [{}]", apply, next);
        }
    }

    private long seek(TopicPartition topicPartition, KafkaTridentSpoutBatchMetadata kafkaTridentSpoutBatchMetadata, long j) {
        if (isFirstPoll(topicPartition, j)) {
            if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) {
                LOG.debug("First poll for topic partition [{}], seeking to partition beginning", topicPartition);
                this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
            } else if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) {
                LOG.debug("First poll for topic partition [{}], seeking to partition end", topicPartition);
                this.kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
            } else if (kafkaTridentSpoutBatchMetadata != null) {
                LOG.debug("First poll for topic partition [{}], using last batch metadata", topicPartition);
                this.kafkaConsumer.seek(topicPartition, kafkaTridentSpoutBatchMetadata.getLastOffset() + 1);
            } else if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", topicPartition);
                this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
            } else if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", topicPartition);
                this.kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
            }
            this.firstPollTransaction.put(topicPartition, Long.valueOf(j));
        } else {
            this.kafkaConsumer.seek(topicPartition, kafkaTridentSpoutBatchMetadata.getLastOffset() + 1);
            LOG.debug("First poll for topic partition [{}], using last batch metadata", topicPartition);
        }
        long position = this.kafkaConsumer.position(topicPartition);
        LOG.debug("Set [fetchOffset = {}]", Long.valueOf(position));
        return position;
    }

    private boolean isFirstPoll(TopicPartition topicPartition, long j) {
        return !this.firstPollTransaction.containsKey(topicPartition) || this.firstPollTransaction.get(topicPartition).longValue() == j;
    }

    private Collection<TopicPartition> pauseTopicPartitions(TopicPartition topicPartition) {
        HashSet hashSet = new HashSet(this.kafkaConsumer.assignment());
        LOG.debug("Currently assigned topic-partitions {}", hashSet);
        hashSet.remove(topicPartition);
        this.kafkaConsumer.pause(hashSet);
        LOG.debug("Paused topic-partitions {}", hashSet);
        return hashSet;
    }

    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> list) {
        LOG.trace("Refreshing of topic-partitions handled by Kafka. No action taken by this method for topic partitions {}", list);
    }

    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<String, Object>> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map<String, Object>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.tpSerializer.fromMap(it.next()));
        }
        List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions = newKafkaTridentSpoutTopicPartitions(arrayList);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", newKafkaTridentSpoutTopicPartitions, Integer.valueOf(this.topologyContext.getThisTaskIndex()), Integer.valueOf(getNumTasks()));
        return newKafkaTridentSpoutTopicPartitions;
    }

    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int i, int i2, List<Map<String, Object>> list) {
        Set<TopicPartition> assignment = this.kafkaConsumer.assignment();
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", this.kafkaConsumer, Integer.valueOf(i), assignment);
        List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions = newKafkaTridentSpoutTopicPartitions(assignment);
        LOG.debug("Returning topic-partitions {} for task with index [{}]", newKafkaTridentSpoutTopicPartitions, Integer.valueOf(i));
        return newKafkaTridentSpoutTopicPartitions;
    }

    private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList(collection == null ? 0 : collection.size());
        if (collection != null) {
            for (TopicPartition topicPartition : collection) {
                LOG.trace("Added topic-partition [{}]", topicPartition);
                arrayList.add(new KafkaTridentSpoutTopicPartition(topicPartition));
            }
        }
        return arrayList;
    }

    private int getNumTasks() {
        return this.topologyContext.getComponentTasks(this.topologyContext.getThisComponentId()).size();
    }

    public void close() {
        this.kafkaConsumer.close();
        LOG.debug("Closed");
    }

    public final String toString() {
        return super.toString() + "{kafkaManager=" + this.kafkaManager + '}';
    }
}
