package org.apache.storm.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.storm.kafka.PartitionManager;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.kafka.trident.IBrokerReader;
import org.apache.storm.kafka.trident.StaticBrokerReader;
import org.apache.storm.kafka.trident.ZkBrokerReader;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/KafkaUtils.class */
public class KafkaUtils {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaUtils.class);
    private static final int NO_OFFSET = -5;

    /* loaded from: input_file:org/apache/storm/kafka/KafkaUtils$KafkaOffsetMetric.class */
    public static class KafkaOffsetMetric implements IMetric {
        Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new HashMap();
        Set<Partition> _partitions;
        DynamicPartitionConnections _connections;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/storm/kafka/KafkaUtils$KafkaOffsetMetric$TopicMetrics.class */
        public class TopicMetrics {
            long totalSpoutLag;
            long totalEarliestTimeOffset;
            long totalLatestTimeOffset;
            long totalLatestEmittedOffset;
            long totalLatestCompletedOffset;

            private TopicMetrics() {
                this.totalSpoutLag = 0L;
                this.totalEarliestTimeOffset = 0L;
                this.totalLatestTimeOffset = 0L;
                this.totalLatestEmittedOffset = 0L;
                this.totalLatestCompletedOffset = 0L;
            }
        }

        public KafkaOffsetMetric(DynamicPartitionConnections dynamicPartitionConnections) {
            this._connections = dynamicPartitionConnections;
        }

        public void setOffsetData(Partition partition, PartitionManager.OffsetData offsetData) {
            this._partitionToOffset.put(partition, offsetData);
        }

        public Object getValueAndReset() {
            try {
                HashMap hashMap = new HashMap();
                if (this._partitions == null || this._partitions.size() != this._partitionToOffset.size()) {
                    KafkaUtils.LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
                    return null;
                }
                TreeMap treeMap = new TreeMap();
                for (Map.Entry<Partition, PartitionManager.OffsetData> entry : this._partitionToOffset.entrySet()) {
                    Partition key = entry.getKey();
                    SimpleConsumer connection = this._connections.getConnection(key);
                    if (connection == null) {
                        KafkaUtils.LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                        return null;
                    }
                    long offset = KafkaUtils.getOffset(connection, key.topic, key.partition, OffsetRequest.LatestTime());
                    long offset2 = KafkaUtils.getOffset(connection, key.topic, key.partition, OffsetRequest.EarliestTime());
                    if (offset == -5) {
                        KafkaUtils.LOG.warn("No data found in Kafka Partition " + key.getId());
                        return null;
                    }
                    long j = entry.getValue().latestEmittedOffset;
                    long j2 = entry.getValue().latestCompletedOffset;
                    long j3 = offset - j2;
                    String str = key.topic;
                    String id = key.getId();
                    if (!id.startsWith(str + "/")) {
                        id = str + "/" + id;
                    }
                    hashMap.put(id + "/spoutLag", Long.valueOf(j3));
                    hashMap.put(id + "/earliestTimeOffset", Long.valueOf(offset2));
                    hashMap.put(id + "/latestTimeOffset", Long.valueOf(offset));
                    hashMap.put(id + "/latestEmittedOffset", Long.valueOf(j));
                    hashMap.put(id + "/latestCompletedOffset", Long.valueOf(j2));
                    if (!treeMap.containsKey(key.topic)) {
                        treeMap.put(key.topic, new TopicMetrics());
                    }
                    TopicMetrics topicMetrics = (TopicMetrics) treeMap.get(key.topic);
                    topicMetrics.totalSpoutLag += j3;
                    topicMetrics.totalEarliestTimeOffset += offset2;
                    topicMetrics.totalLatestTimeOffset += offset;
                    topicMetrics.totalLatestEmittedOffset += j;
                    topicMetrics.totalLatestCompletedOffset += j2;
                }
                for (Map.Entry entry2 : treeMap.entrySet()) {
                    String str2 = (String) entry2.getKey();
                    TopicMetrics topicMetrics2 = (TopicMetrics) entry2.getValue();
                    hashMap.put(str2 + "/totalSpoutLag", Long.valueOf(topicMetrics2.totalSpoutLag));
                    hashMap.put(str2 + "/totalEarliestTimeOffset", Long.valueOf(topicMetrics2.totalEarliestTimeOffset));
                    hashMap.put(str2 + "/totalLatestTimeOffset", Long.valueOf(topicMetrics2.totalLatestTimeOffset));
                    hashMap.put(str2 + "/totalLatestEmittedOffset", Long.valueOf(topicMetrics2.totalLatestEmittedOffset));
                    hashMap.put(str2 + "/totalLatestCompletedOffset", Long.valueOf(topicMetrics2.totalLatestCompletedOffset));
                }
                return hashMap;
            } catch (Throwable th) {
                KafkaUtils.LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", th);
                return null;
            }
        }

        public void refreshPartitions(Set<Partition> set) {
            this._partitions = set;
            Iterator<Partition> it = this._partitionToOffset.keySet().iterator();
            while (it.hasNext()) {
                if (!set.contains(it.next())) {
                    it.remove();
                }
            }
        }
    }

    public static IBrokerReader makeBrokerReader(Map map, KafkaConfig kafkaConfig) {
        return kafkaConfig.hosts instanceof StaticHosts ? new StaticBrokerReader(kafkaConfig.topic, ((StaticHosts) kafkaConfig.hosts).getPartitionInformation()) : new ZkBrokerReader(map, kafkaConfig.topic, (ZkHosts) kafkaConfig.hosts);
    }

    public static long getOffset(SimpleConsumer simpleConsumer, String str, int i, KafkaConfig kafkaConfig) {
        return getOffset(simpleConsumer, str, i, kafkaConfig.startOffsetTime);
    }

    public static long getOffset(SimpleConsumer simpleConsumer, String str, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        long[] offsets = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), simpleConsumer.clientId())).offsets(str, i);
        if (offsets.length > 0) {
            return offsets[0];
        }
        return -5L;
    }

    public static ByteBufferMessageSet fetchMessages(KafkaConfig kafkaConfig, SimpleConsumer simpleConsumer, Partition partition, long j) throws TopicOffsetOutOfRangeException, FailedFetchException, RuntimeException {
        String str = partition.topic;
        int i = partition.partition;
        try {
            FetchResponse fetch = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(str, i, j, kafkaConfig.fetchSizeBytes).clientId(kafkaConfig.clientId).maxWait(kafkaConfig.fetchMaxWait).minBytes(kafkaConfig.minFetchByte).build());
            if (!fetch.hasError()) {
                return fetch.messageSet(str, i);
            }
            KafkaError error = KafkaError.getError(fetch.errorCode(str, i));
            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange) {
                String str2 = partition + " Got fetch request with offset out of range: [" + j + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END;
                LOG.warn(str2);
                throw new TopicOffsetOutOfRangeException(str2);
            }
            String str3 = "Error fetching data from [" + partition + "] for topic [" + str + "]: [" + error + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END;
            LOG.error(str3);
            throw new FailedFetchException(str3);
        } catch (Exception e) {
            if (!(e instanceof ConnectException) && !(e instanceof SocketTimeoutException) && !(e instanceof IOException) && !(e instanceof UnresolvedAddressException)) {
                throw new RuntimeException(e);
            }
            LOG.warn("Network error when fetching messages:", (Throwable) e);
            throw new FailedFetchException(e);
        }
    }

    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message message, String str) {
        ByteBuffer payload = message.payload();
        if (payload == null) {
            return null;
        }
        ByteBuffer key = message.key();
        return (key == null || !(kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme)) ? kafkaConfig.scheme instanceof StringMultiSchemeWithTopic ? ((StringMultiSchemeWithTopic) kafkaConfig.scheme).deserializeWithTopic(str, payload) : kafkaConfig.scheme.deserialize(payload) : kafkaConfig.scheme.deserializeKeyAndValue(key, payload);
    }

    public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme messageMetadataSchemeAsMultiScheme, Message message, Partition partition, long j) {
        ByteBuffer payload = message.payload();
        if (payload == null) {
            return null;
        }
        return messageMetadataSchemeAsMultiScheme.deserializeMessageWithMetadata(payload, partition, j);
    }

    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> list, int i, int i2) {
        Preconditions.checkArgument(i2 < i, "task index must be less that total tasks");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<GlobalPartitionInformation> it = list.iterator();
        while (it.hasNext()) {
            arrayList2.addAll(it.next().getOrderedPartitions());
        }
        int size = arrayList2.size();
        if (size < i) {
            LOG.warn("there are more tasks than partitions (tasks: " + i + "; partitions: " + size + "), some tasks will be idle");
        }
        int i3 = i2;
        while (true) {
            int i4 = i3;
            if (i4 >= size) {
                logPartitionMapping(i, i2, arrayList);
                return arrayList;
            }
            arrayList.add((Partition) arrayList2.get(i4));
            i3 = i4 + i;
        }
    }

    private static void logPartitionMapping(int i, int i2, List<Partition> list) {
        String taskId = taskId(i2, i);
        if (list.isEmpty()) {
            LOG.warn(taskId + "no partitions assigned");
        } else {
            LOG.info(taskId + "assigned " + list);
        }
    }

    public static String taskId(int i, int i2) {
        return "Task [" + (i + 1) + "/" + i2 + "] ";
    }
}
