/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Kafka08PartitionDiscoverer
extends AbstractPartitionDiscoverer {
    private static final Logger LOG = LoggerFactory.getLogger(Kafka08PartitionDiscoverer.class);
    private static final String dummyClientId = "flink-kafka-consumer-partition-lookup";
    private final String[] seedBrokerAddresses;
    private final int numRetries;
    private final int soTimeout;
    private final int bufferSize;
    private int currentContactSeedBrokerIndex;
    private SimpleConsumer consumer;

    public Kafka08PartitionDiscoverer(KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks, Properties kafkaProperties) {
        super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
        Preconditions.checkNotNull((Object)kafkaProperties);
        String seedBrokersConfString = kafkaProperties.getProperty("bootstrap.servers");
        Preconditions.checkArgument((seedBrokersConfString != null && !seedBrokersConfString.isEmpty() ? 1 : 0) != 0, (String)"Configuration property %s not set", (Object[])new Object[]{"bootstrap.servers"});
        this.seedBrokerAddresses = seedBrokersConfString.split(",");
        this.currentContactSeedBrokerIndex = indexOfThisSubtask % this.seedBrokerAddresses.length;
        this.numRetries = PropertiesUtil.getInt((Properties)kafkaProperties, (String)"flink.get-partitions.retry", (int)3);
        this.soTimeout = PropertiesUtil.getInt((Properties)kafkaProperties, (String)"socket.timeout.ms", (int)30000);
        this.bufferSize = PropertiesUtil.getInt((Properties)kafkaProperties, (String)"socket.receive.buffer.bytes", (int)65536);
    }

    protected void initializeConnections() {
        URL contactUrl = NetUtils.getCorrectHostnamePort((String)this.seedBrokerAddresses[this.currentContactSeedBrokerIndex]);
        this.consumer = new SimpleConsumer(contactUrl.getHost(), contactUrl.getPort(), this.soTimeout, this.bufferSize, dummyClientId);
    }

    protected List<String> getAllTopics() {
        LinkedList<String> topics = new LinkedList<String>();
        block4: for (int retry = 0; retry < this.numRetries; ++retry) {
            block5: for (int arrIdx = 0; arrIdx < this.seedBrokerAddresses.length; ++arrIdx) {
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", new Object[]{this.seedBrokerAddresses[this.currentContactSeedBrokerIndex], retry, this.numRetries});
                try {
                    topics.clear();
                    for (TopicMetadata item : this.consumer.send(new TopicMetadataRequest(Collections.emptyList())).topicsMetadata()) {
                        if (item.errorCode() != ErrorMapping.NoError()) {
                            LOG.warn("Error while getting metadata from broker {} to find partitions for {}. Error: {}.", new Object[]{this.seedBrokerAddresses[this.currentContactSeedBrokerIndex], ((Object)topics).toString(), ErrorMapping.exceptionFor((short)item.errorCode()).getMessage()});
                            this.useNextAddressAsNewContactSeedBroker();
                            continue block5;
                        }
                        topics.add(item.topic());
                    }
                    break block4;
                }
                catch (Exception e) {
                    Kafka08PartitionDiscoverer.validateSeedBrokers(this.seedBrokerAddresses, e);
                    LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}", new Object[]{this.seedBrokerAddresses[this.currentContactSeedBrokerIndex], topics, e.getClass().getName(), e.getMessage()});
                    LOG.debug("Detailed trace", (Throwable)e);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    this.useNextAddressAsNewContactSeedBroker();
                }
            }
        }
        return topics;
    }

    public List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) {
        return KafkaTopicPartition.dropLeaderData(this.getPartitionLeadersForTopics(topics));
    }

    protected void wakeupConnections() {
    }

    protected void closeConnections() throws Exception {
        if (this.consumer != null) {
            SimpleConsumer consumer = this.consumer;
            consumer.close();
            this.consumer = null;
        }
    }

    public List<KafkaTopicPartitionLeader> getPartitionLeadersForTopics(List<String> topics) {
        LinkedList<KafkaTopicPartitionLeader> partitions = new LinkedList<KafkaTopicPartitionLeader>();
        block4: for (int retry = 0; retry < this.numRetries; ++retry) {
            block5: for (int arrIdx = 0; arrIdx < this.seedBrokerAddresses.length; ++arrIdx) {
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", new Object[]{this.seedBrokerAddresses[this.currentContactSeedBrokerIndex], retry, this.numRetries});
                try {
                    partitions.clear();
                    for (TopicMetadata item : this.consumer.send(new TopicMetadataRequest(topics)).topicsMetadata()) {
                        if (item.errorCode() != ErrorMapping.NoError()) {
                            LOG.warn("Error while getting metadata from broker {} to find partitions for {}. Error: {}.", new Object[]{this.seedBrokerAddresses[this.currentContactSeedBrokerIndex], topics.toString(), ErrorMapping.exceptionFor((short)item.errorCode()).getMessage()});
                            this.useNextAddressAsNewContactSeedBroker();
                            continue block5;
                        }
                        if (!topics.contains(item.topic())) {
                            LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
                            this.useNextAddressAsNewContactSeedBroker();
                            continue block5;
                        }
                        for (PartitionMetadata part : item.partitionsMetadata()) {
                            Node leader = Kafka08PartitionDiscoverer.brokerToNode(part.leader());
                            KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
                            KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
                            partitions.add(pInfo);
                        }
                    }
                    break block4;
                }
                catch (Exception e) {
                    Kafka08PartitionDiscoverer.validateSeedBrokers(this.seedBrokerAddresses, e);
                    LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}", new Object[]{this.seedBrokerAddresses[this.currentContactSeedBrokerIndex], topics, e.getClass().getName(), e.getMessage()});
                    LOG.debug("Detailed trace", (Throwable)e);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    this.useNextAddressAsNewContactSeedBroker();
                }
            }
        }
        return partitions;
    }

    private void useNextAddressAsNewContactSeedBroker() {
        if (++this.currentContactSeedBrokerIndex == this.seedBrokerAddresses.length) {
            this.currentContactSeedBrokerIndex = 0;
        }
        URL newContactUrl = NetUtils.getCorrectHostnamePort((String)this.seedBrokerAddresses[this.currentContactSeedBrokerIndex]);
        this.consumer = new SimpleConsumer(newContactUrl.getHost(), newContactUrl.getPort(), this.soTimeout, this.bufferSize, dummyClientId);
    }

    private static Node brokerToNode(Broker broker) {
        return new Node(broker.id(), broker.host(), broker.port());
    }

    private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
        if (!(exception instanceof ClosedChannelException)) {
            return;
        }
        int unknownHosts = 0;
        for (String broker : seedBrokers) {
            URL brokerUrl = NetUtils.getCorrectHostnamePort((String)broker.trim());
            try {
                InetAddress.getByName(brokerUrl.getHost());
            }
            catch (UnknownHostException e) {
                ++unknownHosts;
            }
        }
        if (unknownHosts == seedBrokers.length) {
            throw new IllegalArgumentException("All the servers provided in: 'bootstrap.servers' config are invalid. (unknown hosts)");
        }
    }
}

