package org.apache.flink.streaming.connectors.kafka;

import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.common.Node;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.class */
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    private final Properties kafkaProperties;
    private final long invalidOffsetBehavior;
    private final long autoCommitInterval;

    public FlinkKafkaConsumer08(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), deserializationSchema, properties);
    }

    public FlinkKafkaConsumer08(String str, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), keyedDeserializationSchema, properties);
    }

    public FlinkKafkaConsumer08(List<String> list, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(list, (KeyedDeserializationSchema) new KeyedDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKafkaConsumer08(List<String> list, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        super(list, keyedDeserializationSchema);
        Preconditions.checkNotNull(list, "topics");
        this.kafkaProperties = (Properties) Preconditions.checkNotNull(properties, "props");
        validateZooKeeperConfig(properties);
        this.invalidOffsetBehavior = getInvalidOffsetBehavior(properties);
        this.autoCommitInterval = PropertiesUtil.getLong(properties, "auto.commit.interval.ms", 60000L);
    }

    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext) throws Exception {
        return new Kafka08Fetcher(sourceContext, list, serializedValue, serializedValue2, streamingRuntimeContext, this.deserializer, this.kafkaProperties, this.invalidOffsetBehavior, this.autoCommitInterval, !Boolean.valueOf(this.kafkaProperties.getProperty("flink.disable-metrics", "false")).booleanValue());
    }

    protected List<KafkaTopicPartition> getKafkaPartitions(List<String> list) {
        List<KafkaTopicPartition> dropLeaderData = KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(list, this.kafkaProperties));
        if (dropLeaderData.size() == 0) {
            throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + list + ". Please check previous log entries");
        }
        if (LOG.isInfoEnabled()) {
            logPartitionInfo(LOG, dropLeaderData);
        }
        return dropLeaderData;
    }

    public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> list, Properties properties) {
        String property = properties.getProperty("bootstrap.servers");
        int i = PropertiesUtil.getInt(properties, GET_PARTITIONS_RETRIES_KEY, 3);
        Preconditions.checkNotNull(property, "Configuration property %s not set", new Object[]{"bootstrap.servers"});
        String[] split = property.split(",");
        ArrayList arrayList = new ArrayList();
        int i2 = PropertiesUtil.getInt(properties, "socket.timeout.ms", 30000);
        int i3 = PropertiesUtil.getInt(properties, "socket.receive.buffer.bytes", 65536);
        Random random = new Random();
        int i4 = 0;
        while (true) {
            if (i4 >= i) {
                break;
            }
            int nextInt = random.nextInt(split.length);
            for (int i5 = 0; i5 < split.length; i5++) {
                String str = split[nextInt];
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", new Object[]{str, Integer.valueOf(i4), Integer.valueOf(i)});
                nextInt++;
                if (nextInt == split.length) {
                    nextInt = 0;
                }
                URL correctHostnamePort = NetUtils.getCorrectHostnamePort(str);
                SimpleConsumer simpleConsumer = null;
                try {
                    try {
                        simpleConsumer = new SimpleConsumer(correctHostnamePort.getHost(), correctHostnamePort.getPort(), i2, i3, "flink-kafka-consumer-partition-lookup");
                        List<TopicMetadata> list2 = simpleConsumer.send(new TopicMetadataRequest(list)).topicsMetadata();
                        arrayList.clear();
                    } catch (Exception e) {
                        validateSeedBrokers(split, e);
                        LOG.warn("Error communicating with broker " + str + " to find partitions for " + list.toString() + "." + e.getClass() + ". Message: " + e.getMessage());
                        LOG.debug("Detailed trace", e);
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e2) {
                        }
                        if (simpleConsumer != null) {
                            simpleConsumer.close();
                        }
                    }
                    for (TopicMetadata topicMetadata : list2) {
                        if (topicMetadata.errorCode() != ErrorMapping.NoError()) {
                            LOG.warn("Error while getting metadata from broker " + str + " to find partitions for " + list.toString() + ". Error: " + ErrorMapping.exceptionFor(topicMetadata.errorCode()).getMessage());
                            if (simpleConsumer != null) {
                                simpleConsumer.close();
                            }
                        } else if (list.contains(topicMetadata.topic())) {
                            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                                arrayList.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition(topicMetadata.topic(), partitionMetadata.partitionId()), brokerToNode(partitionMetadata.leader())));
                            }
                        } else {
                            LOG.warn("Received metadata from topic " + topicMetadata.topic() + " even though it was not requested. Skipping ...");
                            if (simpleConsumer != null) {
                                simpleConsumer.close();
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Throwable th) {
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                    throw th;
                }
            }
            i4++;
        }
        return arrayList;
    }

    private static Node brokerToNode(Broker broker) {
        return new Node(broker.id(), broker.host(), broker.port());
    }

    protected static void validateZooKeeperConfig(Properties properties) {
        if (properties.getProperty("zookeeper.connect") == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
        }
        if (properties.getProperty("group.id") == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
        }
        try {
            Integer.parseInt(properties.getProperty("zookeeper.session.timeout.ms", "0"));
            try {
                Integer.parseInt(properties.getProperty("zookeeper.connection.timeout.ms", "0"));
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
            }
        } catch (NumberFormatException e2) {
            throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
        }
    }

    private static void validateSeedBrokers(String[] strArr, Exception exc) {
        if (exc instanceof ClosedChannelException) {
            int i = 0;
            for (String str : strArr) {
                try {
                    InetAddress.getByName(NetUtils.getCorrectHostnamePort(str.trim()).getHost());
                } catch (UnknownHostException e) {
                    i++;
                }
            }
            if (i == strArr.length) {
                throw new IllegalArgumentException("All the servers provided in: 'bootstrap.servers' config are invalid. (unknown hosts)");
            }
        }
    }

    private static long getInvalidOffsetBehavior(Properties properties) {
        String property = properties.getProperty("auto.offset.reset", "largest");
        if (property.equals("none")) {
            throw new IllegalArgumentException("Cannot use 'auto.offset.reset' value 'none'. Possible values: 'latest', 'largest', or 'earliest'.");
        }
        return (property.equals("largest") || property.equals("latest")) ? OffsetRequest.LatestTime() : OffsetRequest.EarliestTime();
    }
}
