package org.apache.storm.kafka.trident;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.servlet.jsp.tagext.TagAttributeInfo;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.storm.kafka.DynamicPartitionConnections;
import org.apache.storm.kafka.FailedFetchException;
import org.apache.storm.kafka.KafkaUtils;
import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme;
import org.apache.storm.kafka.Partition;
import org.apache.storm.kafka.PartitionManager;
import org.apache.storm.kafka.StringMessageAndMetadataScheme;
import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
import org.apache.storm.metric.api.CombinedMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/trident/TridentKafkaEmitter.class */
public class TridentKafkaEmitter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TridentKafkaEmitter.class);
    private DynamicPartitionConnections _connections;
    private String _topologyName;
    private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
    private ReducedMetric _kafkaMeanFetchLatencyMetric;
    private CombinedMetric _kafkaMaxFetchLatencyMetric;
    private TridentKafkaConfig _config;
    private String _topologyInstanceId;

    public TridentKafkaEmitter(Map map, TopologyContext topologyContext, TridentKafkaConfig tridentKafkaConfig, String str) {
        this._config = tridentKafkaConfig;
        this._topologyInstanceId = str;
        this._connections = new DynamicPartitionConnections(this._config, KafkaUtils.makeBrokerReader(map, this._config));
        this._topologyName = (String) map.get("topology.name");
        this._kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(this._connections);
        topologyContext.registerMetric("kafkaOffset", this._kafkaOffsetMetric, this._config.metricsTimeBucketSizeInSecs);
        this._kafkaMeanFetchLatencyMetric = topologyContext.registerMetric("kafkaFetchAvg", new MeanReducer(), this._config.metricsTimeBucketSizeInSecs);
        this._kafkaMaxFetchLatencyMetric = topologyContext.registerMetric("kafkaFetchMax", new MaxMetric(), this._config.metricsTimeBucketSizeInSecs);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map failFastEmitNewPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
        Map doEmitNewPartitionBatch = doEmitNewPartitionBatch(this._connections.register(partition), partition, tridentCollector, map);
        Long l = (Long) doEmitNewPartitionBatch.get("offset");
        this._kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(((Long) doEmitNewPartitionBatch.get("nextOffset")).longValue(), l.longValue()));
        return doEmitNewPartitionBatch;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map emitNewPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
        try {
            return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
        } catch (FailedFetchException e) {
            LOG.warn("Failed to fetch from partition " + partition);
            if (map == null) {
                return null;
            }
            HashMap hashMap = new HashMap();
            hashMap.put("offset", map.get("nextOffset"));
            hashMap.put("nextOffset", map.get("nextOffset"));
            hashMap.put(StringMessageAndMetadataScheme.STRING_SCHEME_PARTITION_KEY, Integer.valueOf(partition.partition));
            hashMap.put("broker", ImmutableMap.of("host", (Integer) partition.host.host, "port", Integer.valueOf(partition.host.port)));
            hashMap.put("topic", partition.topic);
            hashMap.put("topology", ImmutableMap.of("name", this._topologyName, TagAttributeInfo.ID, this._topologyInstanceId));
            return hashMap;
        }
    }

    private Map doEmitNewPartitionBatch(SimpleConsumer simpleConsumer, Partition partition, TridentCollector tridentCollector, Map map) {
        long offset;
        ByteBufferMessageSet fetchMessages;
        if (map != null) {
            String str = null;
            Map map2 = (Map) map.get("topology");
            if (map2 != null) {
                str = (String) map2.get(TagAttributeInfo.ID);
            }
            offset = (!this._config.ignoreZkOffsets || this._topologyInstanceId.equals(str)) ? ((Long) map.get("nextOffset")).longValue() : KafkaUtils.getOffset(simpleConsumer, partition.topic, partition.partition, this._config.startOffsetTime);
        } else {
            offset = KafkaUtils.getOffset(simpleConsumer, partition.topic, partition.partition, this._config);
        }
        try {
            fetchMessages = fetchMessages(simpleConsumer, partition, offset);
        } catch (TopicOffsetOutOfRangeException e) {
            long offset2 = KafkaUtils.getOffset(simpleConsumer, partition.topic, partition.partition, OffsetRequest.EarliestTime());
            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + offset2);
            offset = offset2;
            fetchMessages = KafkaUtils.fetchMessages(this._config, simpleConsumer, partition, offset);
        }
        long j = offset;
        Iterator<MessageAndOffset> it = fetchMessages.iterator();
        while (it.hasNext()) {
            MessageAndOffset next = it.next();
            emit(tridentCollector, next.message(), partition, next.offset());
            j = next.nextOffset();
        }
        HashMap hashMap = new HashMap();
        hashMap.put("offset", Long.valueOf(offset));
        hashMap.put("nextOffset", Long.valueOf(j));
        hashMap.put("instanceId", this._topologyInstanceId);
        hashMap.put(StringMessageAndMetadataScheme.STRING_SCHEME_PARTITION_KEY, Integer.valueOf(partition.partition));
        hashMap.put("broker", ImmutableMap.of("host", (Integer) partition.host.host, "port", Integer.valueOf(partition.host.port)));
        hashMap.put("topic", partition.topic);
        hashMap.put("topology", ImmutableMap.of("name", this._topologyName, TagAttributeInfo.ID, this._topologyInstanceId));
        return hashMap;
    }

    private ByteBufferMessageSet fetchMessages(SimpleConsumer simpleConsumer, Partition partition, long j) {
        long nanoTime = System.nanoTime();
        ByteBufferMessageSet fetchMessages = KafkaUtils.fetchMessages(this._config, simpleConsumer, partition, j);
        long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
        this._kafkaMeanFetchLatencyMetric.update(Long.valueOf(nanoTime2));
        this._kafkaMaxFetchLatencyMetric.update(Long.valueOf(nanoTime2));
        return fetchMessages;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reEmitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
        LOG.info("re-emitting batch, attempt " + transactionAttempt);
        String str = (String) map.get("instanceId");
        if (!this._config.ignoreZkOffsets || str.equals(this._topologyInstanceId)) {
            SimpleConsumer register = this._connections.register(partition);
            long longValue = ((Long) map.get("offset")).longValue();
            long longValue2 = ((Long) map.get("nextOffset")).longValue();
            ByteBufferMessageSet fetchMessages = fetchMessages(register, partition, longValue);
            if (fetchMessages != null) {
                Iterator<MessageAndOffset> it = fetchMessages.iterator();
                while (it.hasNext()) {
                    MessageAndOffset next = it.next();
                    if (longValue == longValue2) {
                        return;
                    }
                    if (longValue > longValue2) {
                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                    }
                    emit(tridentCollector, next.message(), partition, next.offset());
                    longValue = next.nextOffset();
                }
            }
        }
    }

    private void emit(TridentCollector tridentCollector, Message message, Partition partition, long j) {
        Iterable<List<Object>> generateTuples = this._config.scheme instanceof MessageMetadataSchemeAsMultiScheme ? KafkaUtils.generateTuples(this._config.scheme, message, partition, j) : KafkaUtils.generateTuples(this._config, message, partition.topic);
        if (generateTuples != null) {
            Iterator<List<Object>> it = generateTuples.iterator();
            while (it.hasNext()) {
                tridentCollector.emit(it.next());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clear() {
        this._connections.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Partition> orderPartitions(List<GlobalPartitionInformation> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<GlobalPartitionInformation> it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getOrderedPartitions());
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refresh(List<Partition> list) {
        this._connections.clear();
        this._kafkaOffsetMetric.refreshPartitions(new HashSet(list));
    }

    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() { // from class: org.apache.storm.kafka.trident.TridentKafkaEmitter.1
            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                return TridentKafkaEmitter.this.emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }

            public void refreshPartitions(List<Partition> list) {
                TridentKafkaEmitter.this.refresh(list);
            }

            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> list) {
                return TridentKafkaEmitter.this.orderPartitions(list);
            }

            public void close() {
                TridentKafkaEmitter.this.clear();
            }
        };
    }

    public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() { // from class: org.apache.storm.kafka.trident.TridentKafkaEmitter.2
            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                return TridentKafkaEmitter.this.failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }

            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
                TridentKafkaEmitter.this.reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
            }

            public void refreshPartitions(List<Partition> list) {
                TridentKafkaEmitter.this.refresh(list);
            }

            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> list) {
                return TridentKafkaEmitter.this.orderPartitions(list);
            }

            public void close() {
                TridentKafkaEmitter.this.clear();
            }
        };
    }
}
