/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;

public class FlinkKafkaConsumer08<T>
extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    private final Properties kafkaProperties;

    public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }

    public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }

    public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, (KeyedDeserializationSchema<T>)new KeyedDeserializationSchemaWrapper(deserializer), props);
    }

    public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
        super(topics, deserializer);
        Preconditions.checkNotNull(topics, (String)"topics");
        this.kafkaProperties = (Properties)Preconditions.checkNotNull((Object)props, (String)"props");
        FlinkKafkaConsumer08.validateZooKeeperConfig(props);
        FlinkKafkaConsumer08.validateAutoOffsetResetValue(props);
    }

    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
        boolean useMetrics = !PropertiesUtil.getBoolean((Properties)this.kafkaProperties, (String)"flink.disable-metrics", (boolean)false);
        long autoCommitInterval = offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC ? PropertiesUtil.getLong((Properties)this.kafkaProperties, (String)"auto.commit.interval.ms", (long)60000L) : -1L;
        return new Kafka08Fetcher<T>(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, this.deserializer, this.kafkaProperties, autoCommitInterval, useMetrics);
    }

    protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
        List partitionInfos = KafkaTopicPartition.dropLeaderData(FlinkKafkaConsumer08.getPartitionsForTopic(topics, this.kafkaProperties));
        if (partitionInfos.size() == 0) {
            throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics + ". Please check previous log entries");
        }
        if (LOG.isInfoEnabled()) {
            FlinkKafkaConsumer08.logPartitionInfo((Logger)LOG, (List)partitionInfos);
        }
        return partitionInfos;
    }

    protected boolean getIsAutoCommitEnabled() {
        return PropertiesUtil.getBoolean((Properties)this.kafkaProperties, (String)"auto.commit.enable", (boolean)true) && PropertiesUtil.getLong((Properties)this.kafkaProperties, (String)"auto.commit.interval.ms", (long)60000L) > 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
        String seedBrokersConfString = properties.getProperty("bootstrap.servers");
        int numRetries = PropertiesUtil.getInt((Properties)properties, (String)GET_PARTITIONS_RETRIES_KEY, (int)3);
        Preconditions.checkNotNull((Object)seedBrokersConfString, (String)"Configuration property %s not set", (Object[])new Object[]{"bootstrap.servers"});
        String[] seedBrokers = seedBrokersConfString.split(",");
        ArrayList<KafkaTopicPartitionLeader> partitions = new ArrayList<KafkaTopicPartitionLeader>();
        String clientId = "flink-kafka-consumer-partition-lookup";
        int soTimeout = PropertiesUtil.getInt((Properties)properties, (String)"socket.timeout.ms", (int)30000);
        int bufferSize = PropertiesUtil.getInt((Properties)properties, (String)"socket.receive.buffer.bytes", (int)65536);
        Random rnd = new Random();
        block9: for (int retry = 0; retry < numRetries; ++retry) {
            int index = rnd.nextInt(seedBrokers.length);
            block10: for (int arrIdx = 0; arrIdx < seedBrokers.length; ++arrIdx) {
                String seedBroker = seedBrokers[index];
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", new Object[]{seedBroker, retry, numRetries});
                if (++index == seedBrokers.length) {
                    index = 0;
                }
                URL brokerUrl = NetUtils.getCorrectHostnamePort((String)seedBroker);
                try (SimpleConsumer consumer = null;){
                    consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, "flink-kafka-consumer-partition-lookup");
                    TopicMetadataRequest req = new TopicMetadataRequest(topics);
                    TopicMetadataResponse resp = consumer.send(req);
                    List metaData = resp.topicsMetadata();
                    partitions.clear();
                    for (TopicMetadata item : metaData) {
                        if (item.errorCode() != ErrorMapping.NoError()) {
                            LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor((short)item.errorCode()).getMessage());
                            continue block10;
                        }
                        if (!topics.contains(item.topic())) {
                            LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
                            continue block10;
                        }
                        for (PartitionMetadata part : item.partitionsMetadata()) {
                            Node leader = FlinkKafkaConsumer08.brokerToNode(part.leader());
                            KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
                            KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
                            partitions.add(pInfo);
                        }
                    }
                    break block9;
                }
            }
        }
        return partitions;
    }

    private static Node brokerToNode(Broker broker) {
        return new Node(broker.id(), broker.host(), broker.port());
    }

    protected static void validateZooKeeperConfig(Properties props) {
        if (props.getProperty("zookeeper.connect") == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
        }
        if (props.getProperty("group.id") == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
        }
        try {
            Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
        }
        try {
            Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
        }
    }

    private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
        if (!(exception instanceof ClosedChannelException)) {
            return;
        }
        int unknownHosts = 0;
        for (String broker : seedBrokers) {
            URL brokerUrl = NetUtils.getCorrectHostnamePort((String)broker.trim());
            try {
                InetAddress.getByName(brokerUrl.getHost());
            }
            catch (UnknownHostException e) {
                ++unknownHosts;
            }
        }
        if (unknownHosts == seedBrokers.length) {
            throw new IllegalArgumentException("All the servers provided in: 'bootstrap.servers' config are invalid. (unknown hosts)");
        }
    }

    private static void validateAutoOffsetResetValue(Properties config) {
        String val = config.getProperty("auto.offset.reset", "largest");
        if (!(val.equals("largest") || val.equals("latest") || val.equals("earliest") || val.equals("smallest"))) {
            throw new IllegalArgumentException("Cannot use 'auto.offset.reset' value '" + val + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'.");
        }
    }
}

