package org.apache.crunch.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.EndPoint;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.commons.lang.StringUtils;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/kafka/KafkaUtils.class */
public class KafkaUtils {
    private static final String CLIENT_ID = "crunch-kafka-client";
    public static final String KAFKA_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.attempts";
    public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5;
    public static final String KAFKA_EMPTY_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.empty.attempts";
    public static final int KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
    private static final Random RANDOM = new Random();
    public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(5);
    public static final String KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT_STRING = Integer.toString(10);

    public static Properties getKafkaConnectionProperties(Configuration configuration) {
        Properties properties = new Properties();
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            properties.setProperty((String) entry.getKey(), (String) entry.getValue());
        }
        return properties;
    }

    public static Configuration addKafkaConnectionProperties(Properties properties, Configuration configuration) {
        for (String str : properties.stringPropertyNames()) {
            configuration.set(str, properties.getProperty(str));
        }
        return configuration;
    }

    private static TopicMetadataRequest getTopicMetadataRequest(String... strArr) {
        if (strArr == null) {
            throw new IllegalArgumentException("topics cannot be null");
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("topics cannot be empty");
        }
        for (String str : strArr) {
            if (StringUtils.isBlank(str)) {
                throw new IllegalArgumentException("No topic can be null, empty or blank");
            }
        }
        return new TopicMetadataRequest(Arrays.asList(strArr));
    }

    public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long j, String... strArr) {
        if (properties == null) {
            throw new IllegalArgumentException("properties cannot be null");
        }
        List<Broker> brokers = getBrokers(properties);
        Collections.shuffle(brokers, RANDOM);
        return getBrokerOffsets(brokers, j, strArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v105, types: [java.util.Map] */
    static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> list, long j, String... strArr) {
        SimpleConsumer simpleConsumer;
        long longValue;
        if (strArr == null) {
            throw new IllegalArgumentException("topics cannot be null");
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("topics cannot be empty");
        }
        for (String str : strArr) {
            if (StringUtils.isBlank(str)) {
                throw new IllegalArgumentException("No topic can be null, empty or blank");
            }
        }
        TopicMetadataResponse topicMetadataResponse = null;
        TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(strArr);
        Iterator<Broker> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Broker next = it.next();
            simpleConsumer = getSimpleConsumer(next);
            try {
                try {
                    topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
                    simpleConsumer.close();
                    break;
                } catch (Exception e) {
                    LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed", Arrays.toString(strArr), ((EndPoint) next.endPoints().get(SecurityProtocol.PLAINTEXT).get()).host()), e);
                    simpleConsumer.close();
                }
            } catch (Throwable th) {
                throw th;
            }
        }
        if (topicMetadataResponse == null) {
            throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed", Arrays.toString(strArr), Arrays.toString(list.toArray())));
        }
        HashMap hashMap = new HashMap();
        for (TopicMetadata topicMetadata : topicMetadataResponse.topicsMetadata()) {
            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                HashMap hashMap2 = new HashMap();
                BrokerEndPoint leader = partitionMetadata.leader();
                if (leader == null) {
                    throw new CrunchRuntimeException("Unable to find leader for topic:" + topicMetadata.topic() + " partition:" + partitionMetadata.partitionId());
                }
                Broker broker = new Broker(0, leader.host(), leader.port(), SecurityProtocol.PLAINTEXT);
                if (hashMap.containsKey(broker)) {
                    hashMap2 = (Map) hashMap.get(broker);
                }
                hashMap2.put(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId()), new PartitionOffsetRequestInfo(j, 1));
                hashMap.put(broker, hashMap2);
            }
        }
        HashMap hashMap3 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            simpleConsumer = getSimpleConsumer((Broker) entry.getKey());
            try {
                OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest((Map) entry.getValue(), kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID));
                simpleConsumer.close();
                Map<TopicPartition, Long> map = null;
                Iterator it2 = ((Map) entry.getValue()).entrySet().iterator();
                while (it2.hasNext()) {
                    TopicAndPartition topicAndPartition = (TopicAndPartition) ((Map.Entry) it2.next()).getKey();
                    TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
                    long[] offsets = offsetsBefore.offsets(topicAndPartition.topic(), topicAndPartition.partition());
                    if (offsets.length > 0) {
                        longValue = offsets[0];
                    } else {
                        LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead", topicAndPartition);
                        if (j == kafka.api.OffsetRequest.EarliestTime()) {
                            throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic() + "] but Kafka returned no values");
                        }
                        if (map == null) {
                            map = getBrokerOffsets((List<Broker>) Arrays.asList((Broker) entry.getKey()), kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
                        }
                        longValue = map.get(topicPartition).longValue();
                    }
                    hashMap3.put(topicPartition, Long.valueOf(longValue));
                }
            } finally {
                simpleConsumer.close();
            }
        }
        return hashMap3;
    }

    private static SimpleConsumer getSimpleConsumer(Broker broker) {
        EndPoint endPoint = (EndPoint) broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
        return new SimpleConsumer(endPoint.host(), endPoint.port(), 100000, 65536, CLIENT_ID);
    }

    private static List<Broker> getBrokers(Properties properties) {
        if (properties == null) {
            throw new IllegalArgumentException("props cannot be null");
        }
        String property = properties.getProperty("metadata.broker.list");
        if (property == null) {
            throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties");
        }
        String[] split = property.split(",");
        if (split.length < 1) {
            throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(split) + "]");
        }
        ArrayList arrayList = new ArrayList(split.length);
        for (String str : split) {
            String[] split2 = str.split(":");
            if (split2.length != 2) {
                throw new IllegalArgumentException("Unable to parse host/port from broker string : [" + Arrays.toString(split2) + "] from broker list : [" + Arrays.toString(split) + "]");
            }
            try {
                arrayList.add(new Broker(0, split2[0], Integer.parseInt(split2[1]), SecurityProtocol.PLAINTEXT));
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Error parsing broker port : " + split2[1], e);
            }
        }
        return arrayList;
    }
}
