package org.apache.flink.streaming.connectors.kafka.api.simple;

import java.io.UnsupportedEncodingException;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.api.TopicMetadata;
import kafka.cluster.Broker;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.class */
public class KafkaTopicUtils {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class);
    private final ZkClient zkClient;
    public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
    public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils$KafkaZKStringSerializer.class */
    private static class KafkaZKStringSerializer implements ZkSerializer {
        private KafkaZKStringSerializer() {
        }

        public byte[] serialize(Object obj) throws ZkMarshallingError {
            try {
                return ((String) obj).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }

        public Object deserialize(byte[] bArr) throws ZkMarshallingError {
            if (bArr == null) {
                return null;
            }
            try {
                return new String(bArr, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public KafkaTopicUtils(String str) {
        this(str, 10000, 10000);
    }

    public KafkaTopicUtils(String str, int i, int i2) {
        this.zkClient = new ZkClient(str, i, i2, new KafkaZKStringSerializer());
        this.zkClient.waitUntilConnected();
    }

    public void createTopic(String str, int i, int i2) {
        LOG.info("Creating Kafka topic '{}'", str);
        Properties properties = new Properties();
        if (!topicExists(str)) {
            AdminUtils.createTopic(this.zkClient, str, i, i2, properties);
        } else if (LOG.isWarnEnabled()) {
            LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", str);
        }
    }

    public int getNumberOfPartitions(String str) {
        return JavaConversions.asJavaCollection(getTopicInfo(str).partitionsMetadata()).size();
    }

    public String getLeaderBrokerAddressForTopic(String str) {
        return ((Broker) JavaConversions.asJavaCollection(((PartitionMetadata) JavaConversions.asJavaCollection(getTopicInfo(str).partitionsMetadata()).iterator().next()).isr()).iterator().next()).connectionString();
    }

    public TopicMetadata getTopicInfo(String str) {
        if (topicExists(str)) {
            return AdminUtils.fetchTopicMetadataFromZk(str, this.zkClient);
        }
        throw new RuntimeException("Topic does not exist: " + str);
    }

    public boolean topicExists(String str) {
        return AdminUtils.topicExists(this.zkClient, str);
    }
}
