package org.apache.heron.spouts.kafka;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.heron.api.Config;
import org.apache.heron.api.spout.BaseRichSpout;
import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/heron/spouts/kafka/KafkaSpout.class */
public class KafkaSpout<K, V> extends BaseRichSpout implements IStatefulComponent<TopicPartition, Long> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    private static final long serialVersionUID = -2271355516537883361L;
    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
    private TopicPatternProvider topicPatternProvider;
    private Collection<String> topicNames;
    private transient SpoutOutputCollector collector;
    private transient TopologyContext topologyContext;
    private transient Queue<ConsumerRecord<K, V>> buffer;
    private transient Consumer<K, V> consumer;
    private transient Set<MetricName> reportedMetrics;
    private transient Set<TopicPartition> assignedPartitions;
    private transient Map<TopicPartition, NavigableMap<Long, Long>> ackRegistry;
    private transient Map<TopicPartition, Long> failureRegistry;
    private State<TopicPartition, Long> state;
    private int metricsIntervalInSecs = 60;
    private ConsumerRecordTransformer<K, V> consumerRecordTransformer = new DefaultConsumerRecordTransformer();
    private Config.TopologyReliabilityMode topologyReliabilityMode = Config.TopologyReliabilityMode.ATMOST_ONCE;
    private long previousKafkaMetricsUpdatedTimestamp = 0;

    /* renamed from: org.apache.heron.spouts.kafka.KafkaSpout$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/heron/spouts/kafka/KafkaSpout$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$heron$api$Config$TopologyReliabilityMode = new int[Config.TopologyReliabilityMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$heron$api$Config$TopologyReliabilityMode[Config.TopologyReliabilityMode.ATMOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$heron$api$Config$TopologyReliabilityMode[Config.TopologyReliabilityMode.ATLEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$heron$api$Config$TopologyReliabilityMode[Config.TopologyReliabilityMode.EFFECTIVELY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/heron/spouts/kafka/KafkaSpout$ConsumerRecordMessageId.class */
    public static class ConsumerRecordMessageId {
        private TopicPartition topicPartition;
        private long offset;

        ConsumerRecordMessageId(TopicPartition topicPartition, long j) {
            this.topicPartition = topicPartition;
            this.offset = j;
        }

        public String toString() {
            return "ConsumerRecordMessageId{topicPartition=" + this.topicPartition + ", offset=" + this.offset + '}';
        }

        TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        long getOffset() {
            return this.offset;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ConsumerRecordMessageId consumerRecordMessageId = (ConsumerRecordMessageId) obj;
            if (this.offset != consumerRecordMessageId.offset) {
                return false;
            }
            return this.topicPartition.equals(consumerRecordMessageId.topicPartition);
        }

        public int hashCode() {
            return (31 * this.topicPartition.hashCode()) + ((int) (this.offset ^ (this.offset >>> 32)));
        }
    }

    /* loaded from: input_file:org/apache/heron/spouts/kafka/KafkaSpout$KafkaConsumerRebalanceListener.class */
    public class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
        public KafkaConsumerRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaSpout.LOG.info("revoked partitions {}", collection);
            if (KafkaSpout.this.topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
                collection.forEach(topicPartition -> {
                    KafkaSpout.this.ackRegistry.remove(topicPartition);
                    KafkaSpout.this.failureRegistry.remove(topicPartition);
                });
            } else if (KafkaSpout.this.topologyReliabilityMode == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
                collection.forEach(topicPartition2 -> {
                    KafkaSpout.this.state.remove(topicPartition2);
                });
            }
            KafkaSpout.this.assignedPartitions.removeAll(collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaSpout.LOG.info("assigned partitions {}", collection);
            if (KafkaSpout.this.topologyReliabilityMode == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
                collection.forEach(topicPartition -> {
                    if (KafkaSpout.this.state.containsKey(topicPartition)) {
                        KafkaSpout.this.consumer.seek(topicPartition, ((Long) KafkaSpout.this.state.get(topicPartition)).longValue());
                    }
                });
            }
            KafkaSpout.this.assignedPartitions.addAll(collection);
        }
    }

    public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory, Collection<String> collection) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.topicNames = collection;
    }

    public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory, TopicPatternProvider topicPatternProvider) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.topicPatternProvider = topicPatternProvider;
    }

    public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
        return this.consumerRecordTransformer;
    }

    public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V> consumerRecordTransformer) {
        this.consumerRecordTransformer = consumerRecordTransformer;
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.topologyContext = topologyContext;
        initialize(map);
    }

    public void initState(State<TopicPartition, Long> state) {
        this.state = state;
        LOG.info("initial state {}", state);
    }

    public void preSave(String str) {
        LOG.info("save state {}", this.state);
        this.consumer.commitAsync((Map) this.state.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndMetadata(((Long) entry.getValue()).longValue() + 1);
        })), (OffsetCommitCallback) null);
    }

    public void nextTuple() {
        ConsumerRecord<K, V> poll = this.buffer.poll();
        if (poll != null) {
            emitConsumerRecord(poll);
            return;
        }
        if (this.topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
            this.ackRegistry.forEach((topicPartition, navigableMap) -> {
                if (navigableMap != null) {
                    rewindAndDiscardAck(topicPartition, navigableMap);
                    manualCommit(topicPartition, navigableMap);
                }
            });
        }
        poll().forEach(consumerRecord -> {
            this.buffer.offer(consumerRecord);
        });
    }

    public void activate() {
        if (this.assignedPartitions.isEmpty()) {
            return;
        }
        this.consumer.resume(this.assignedPartitions);
    }

    public void deactivate() {
        if (this.assignedPartitions.isEmpty()) {
            return;
        }
        this.consumer.pause(this.assignedPartitions);
    }

    public void ack(Object obj) {
        long nanoTime = System.nanoTime();
        ConsumerRecordMessageId consumerRecordMessageId = (ConsumerRecordMessageId) obj;
        TopicPartition topicPartition = consumerRecordMessageId.getTopicPartition();
        if (!this.assignedPartitions.contains(topicPartition)) {
            LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
            return;
        }
        long offset = consumerRecordMessageId.getOffset();
        this.ackRegistry.putIfAbsent(topicPartition, new TreeMap());
        NavigableMap<Long, Long> navigableMap = this.ackRegistry.get(topicPartition);
        Map.Entry<Long, Long> floorEntry = navigableMap.floorEntry(Long.valueOf(offset));
        Map.Entry<Long, Long> ceilingEntry = navigableMap.ceilingEntry(Long.valueOf(offset));
        long longValue = floorEntry != null ? floorEntry.getKey().longValue() : Long.MIN_VALUE;
        long longValue2 = floorEntry != null ? floorEntry.getValue().longValue() : Long.MIN_VALUE;
        long longValue3 = ceilingEntry != null ? ceilingEntry.getKey().longValue() : Long.MAX_VALUE;
        long longValue4 = ceilingEntry != null ? ceilingEntry.getValue().longValue() : Long.MAX_VALUE;
        if (offset < longValue || offset > longValue2) {
            if (offset < longValue3 || offset > longValue4) {
                if (longValue3 - longValue2 == 2) {
                    navigableMap.put(Long.valueOf(longValue), Long.valueOf(longValue4));
                    navigableMap.remove(Long.valueOf(longValue3));
                } else if (offset == longValue2 + 1) {
                    navigableMap.put(Long.valueOf(longValue), Long.valueOf(offset));
                } else if (offset == longValue3 - 1) {
                    navigableMap.remove(Long.valueOf(longValue3));
                    navigableMap.put(Long.valueOf(offset), Long.valueOf(longValue4));
                } else {
                    navigableMap.put(Long.valueOf(offset), Long.valueOf(offset));
                }
                LOG.debug("ack {} in {} ns", obj, Long.valueOf(System.nanoTime() - nanoTime));
                LOG.debug("{}", this.ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
            }
        }
    }

    public void fail(Object obj) {
        ConsumerRecordMessageId consumerRecordMessageId = (ConsumerRecordMessageId) obj;
        TopicPartition topicPartition = consumerRecordMessageId.getTopicPartition();
        if (!this.assignedPartitions.contains(topicPartition)) {
            LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
            return;
        }
        this.failureRegistry.put(topicPartition, Long.valueOf(Math.min(this.failureRegistry.getOrDefault(topicPartition, Long.MAX_VALUE).longValue(), consumerRecordMessageId.getOffset())));
        LOG.warn("fail {}", obj);
    }

    public void close() {
        this.consumer.close();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this.consumerRecordTransformer.getOutputStreams().forEach(str -> {
            outputFieldsDeclarer.declareStream(str, new Fields(this.consumerRecordTransformer.getFieldNames(str)));
        });
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    private void initialize(Map<String, Object> map) {
        this.topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(map.get("topology.reliability.mode").toString());
        this.metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG)).getHeronMetricsExportInterval().getSeconds();
        this.consumer = this.kafkaConsumerFactory.create();
        if (this.topicNames != null) {
            this.consumer.subscribe(this.topicNames, new KafkaConsumerRebalanceListener());
        } else {
            this.consumer.subscribe(this.topicPatternProvider.create(), new KafkaConsumerRebalanceListener());
        }
        this.buffer = new ArrayDeque(500);
        this.ackRegistry = new HashMap();
        this.failureRegistry = new HashMap();
        this.assignedPartitions = new HashSet();
        this.reportedMetrics = new HashSet();
    }

    private void emitConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
        Map<String, List<Object>> transform = this.consumerRecordTransformer.transform(consumerRecord);
        if (transform.isEmpty() && this.topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
            ack(new ConsumerRecordMessageId(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()));
        } else {
            transform.forEach((str, list) -> {
                switch (AnonymousClass1.$SwitchMap$org$apache$heron$api$Config$TopologyReliabilityMode[this.topologyReliabilityMode.ordinal()]) {
                    case 1:
                        this.collector.emit(str, list);
                        return;
                    case 2:
                        this.collector.emit(str, list, new ConsumerRecordMessageId(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()));
                        return;
                    case 3:
                        this.collector.emit(str, list);
                        this.state.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
                        return;
                    default:
                        LOG.warn("unsupported reliability mode {}", this.topologyReliabilityMode);
                        return;
                }
            });
        }
    }

    private void rewindAndDiscardAck(TopicPartition topicPartition, NavigableMap<Long, Long> navigableMap) {
        if (this.failureRegistry.containsKey(topicPartition)) {
            long longValue = this.failureRegistry.get(topicPartition).longValue();
            this.consumer.seek(topicPartition, longValue);
            SortedMap<Long, Long> headMap = navigableMap.headMap(Long.valueOf(longValue));
            if (!headMap.isEmpty()) {
                headMap.put(headMap.lastKey(), Long.valueOf(Math.min(longValue, headMap.get(headMap.lastKey()).longValue())));
            }
            this.ackRegistry.put(topicPartition, new TreeMap((SortedMap) headMap));
            this.failureRegistry.remove(topicPartition);
        }
    }

    private void manualCommit(TopicPartition topicPartition, NavigableMap<Long, Long> navigableMap) {
        Map.Entry<Long, Long> firstEntry = navigableMap.firstEntry();
        if (firstEntry != null) {
            this.consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(firstEntry.getValue().longValue() + 1)), (OffsetCommitCallback) null);
        }
    }

    private Iterable<ConsumerRecord<K, V>> poll() {
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(200L));
        if (poll.isEmpty()) {
            return Collections.emptyList();
        }
        if (System.currentTimeMillis() - this.previousKafkaMetricsUpdatedTimestamp > this.metricsIntervalInSecs * 1000) {
            registerConsumerMetrics();
            this.previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
        }
        if (this.topologyReliabilityMode == Config.TopologyReliabilityMode.ATMOST_ONCE) {
            this.consumer.commitAsync();
        }
        return poll;
    }

    private void registerConsumerMetrics() {
        this.consumer.metrics().forEach((metricName, metric) -> {
            if (this.reportedMetrics.contains(metricName)) {
                return;
            }
            this.reportedMetrics.add(metricName);
            String extractKafkaMetricName = extractKafkaMetricName(metricName);
            LOG.info("register Kakfa Consumer metric {}", extractKafkaMetricName);
            this.topologyContext.registerMetric(extractKafkaMetricName, new KafkaMetricDecorator(metric), this.metricsIntervalInSecs);
        });
    }

    private String extractKafkaMetricName(MetricName metricName) {
        StringBuilder append = new StringBuilder().append(metricName.name()).append('-').append(metricName.group());
        metricName.tags().forEach((str, str2) -> {
            append.append('-').append(str).append('-').append(str2);
        });
        LOG.info("register Kakfa Consumer metric {}", append);
        return append.toString();
    }
}
