package org.apache.flink.streaming.connectors.kafka;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.connectors.kafka.util.KafkaUtils;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.class */
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer08.class);
    public static final long OFFSET_NOT_SET = -915623761776L;
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    private final List<KafkaTopicPartition> partitionInfos;
    private final Properties props;
    private transient Fetcher fetcher;
    private transient OffsetHandler offsetHandler;
    private transient List<KafkaTopicPartition> subscribedPartitions;
    private transient HashMap<KafkaTopicPartition, Long> committedOffsets;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08$PeriodicOffsetCommitter.class */
    private static class PeriodicOffsetCommitter<T> extends Thread {
        private final long commitInterval;
        private final FlinkKafkaConsumer08<T> consumer;
        private volatile boolean running = true;

        public PeriodicOffsetCommitter(long j, FlinkKafkaConsumer08<T> flinkKafkaConsumer08) {
            this.commitInterval = j;
            this.consumer = flinkKafkaConsumer08;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    try {
                        Thread.sleep(this.commitInterval);
                        this.consumer.commitOffsets((HashMap) this.consumer.offsetsState.clone());
                    } catch (InterruptedException e) {
                        if (this.running) {
                            throw e;
                        }
                    }
                } catch (Throwable th) {
                    FlinkKafkaConsumer08.LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", th);
                    ((FlinkKafkaConsumer08) this.consumer).fetcher.stopWithError(th);
                    return;
                }
            }
        }

        public void close() {
            this.running = false;
            interrupt();
        }
    }

    public FlinkKafkaConsumer08(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), deserializationSchema, properties);
    }

    public FlinkKafkaConsumer08(String str, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), keyedDeserializationSchema, properties);
    }

    public FlinkKafkaConsumer08(List<String> list, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(list, (KeyedDeserializationSchema) new KeyedDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKafkaConsumer08(List<String> list, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        super(keyedDeserializationSchema, properties);
        Objects.requireNonNull(list, "topics");
        this.props = (Properties) Objects.requireNonNull(properties, "props");
        validateZooKeeperConfig(properties);
        this.partitionInfos = KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(list, properties));
        if (this.partitionInfos.size() == 0) {
            throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + list.toString() + ".Please check previous log entries");
        }
        if (LOG.isInfoEnabled()) {
            logPartitionInfo(this.partitionInfos);
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.subscribedPartitions = assignPartitions(this.partitionInfos, numberOfParallelSubtasks, indexOfThisSubtask);
        if (LOG.isInfoEnabled()) {
            LOG.info("Kafka consumer {} will read partitions {} out of partitions {}", new Object[]{Integer.valueOf(indexOfThisSubtask), KafkaTopicPartition.toString(this.subscribedPartitions), Integer.valueOf(this.partitionInfos.size())});
        }
        if (this.subscribedPartitions.isEmpty()) {
            LOG.info("Kafka consumer {} has no partitions (empty source)", Integer.valueOf(indexOfThisSubtask));
            this.fetcher = null;
            return;
        }
        this.offsetHandler = new ZookeeperOffsetHandler(this.props);
        this.committedOffsets = new HashMap<>();
        HashMap hashMap = new HashMap(this.subscribedPartitions.size());
        Iterator<KafkaTopicPartition> it = this.subscribedPartitions.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(OFFSET_NOT_SET));
        }
        if (this.restoreToOffset != null) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Consumer {} is restored from previous checkpoint: {}", Integer.valueOf(indexOfThisSubtask), KafkaTopicPartition.toString(this.restoreToOffset));
            }
            this.offsetsState = this.restoreToOffset;
            hashMap.putAll(this.restoreToOffset);
            this.restoreToOffset = null;
        } else {
            this.offsetsState = new HashMap();
            hashMap.putAll(this.offsetHandler.getOffsets(this.subscribedPartitions));
        }
        if (hashMap.size() != this.subscribedPartitions.size()) {
            throw new IllegalStateException("The subscribed partitions map has more entries than the subscribed partitions list: " + hashMap.size() + "," + this.subscribedPartitions.size());
        }
        this.fetcher = new LegacyFetcher(hashMap, this.props, getRuntimeContext().getTaskName(), getRuntimeContext().getUserCodeClassLoader());
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.fetcher != null) {
            PeriodicOffsetCommitter periodicOffsetCommitter = null;
            if (!getRuntimeContext().isCheckpointingEnabled()) {
                long longFromConfig = KafkaUtils.getLongFromConfig(this.props, "auto.commit.interval.ms", 60000L);
                periodicOffsetCommitter = new PeriodicOffsetCommitter(longFromConfig, this);
                periodicOffsetCommitter.setDaemon(true);
                periodicOffsetCommitter.start();
                LOG.info("Starting periodic offset committer, with commit interval of {}ms", Long.valueOf(longFromConfig));
            }
            try {
                this.fetcher.run(sourceContext, this.deserializer, this.offsetsState);
                if (periodicOffsetCommitter != null) {
                    periodicOffsetCommitter.close();
                    try {
                        periodicOffsetCommitter.join();
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Throwable th) {
                if (periodicOffsetCommitter != null) {
                    periodicOffsetCommitter.close();
                    try {
                        periodicOffsetCommitter.join();
                    } catch (InterruptedException e2) {
                    }
                }
                throw th;
            }
        } else {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
            Object obj = new Object();
            while (this.running) {
                try {
                    synchronized (obj) {
                        obj.wait();
                    }
                } catch (InterruptedException e3) {
                }
            }
        }
        sourceContext.close();
    }

    public void cancel() {
        this.running = false;
        Fetcher fetcher = this.fetcher;
        this.fetcher = null;
        if (fetcher != null) {
            try {
                fetcher.close();
            } catch (IOException e) {
                LOG.warn("Error while closing Kafka connector data fetcher", e);
            }
        }
        OffsetHandler offsetHandler = this.offsetHandler;
        this.offsetHandler = null;
        if (offsetHandler != null) {
            try {
                offsetHandler.close();
            } catch (IOException e2) {
                LOG.warn("Error while closing Kafka connector offset handler", e2);
            }
        }
    }

    public void close() throws Exception {
        cancel();
        super.close();
    }

    protected void commitOffsets(HashMap<KafkaTopicPartition, Long> hashMap) throws Exception {
        HashMap hashMap2 = new HashMap();
        for (KafkaTopicPartition kafkaTopicPartition : this.subscribedPartitions) {
            Long l = hashMap.get(kafkaTopicPartition);
            if (l != null) {
                Long l2 = this.committedOffsets.get(kafkaTopicPartition);
                if (l2 == null) {
                    l2 = Long.valueOf(OFFSET_NOT_SET);
                }
                if (l.longValue() != OFFSET_NOT_SET) {
                    if (l.longValue() > l2.longValue()) {
                        hashMap2.put(kafkaTopicPartition, l);
                        this.committedOffsets.put(kafkaTopicPartition, l);
                        LOG.debug("Committing offset {} for partition {}", l, kafkaTopicPartition);
                    } else {
                        LOG.debug("Ignoring offset {} for partition {} because it is already committed", l, kafkaTopicPartition);
                    }
                }
            }
        }
        if (LOG.isDebugEnabled() && hashMap2.size() > 0) {
            LOG.debug("Committing offsets {} to Zookeeper", KafkaTopicPartition.toString(hashMap2));
        }
        this.offsetHandler.commit(hashMap2);
    }

    public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> list, Properties properties) {
        String property = properties.getProperty("bootstrap.servers");
        int intFromConfig = KafkaUtils.getIntFromConfig(properties, GET_PARTITIONS_RETRIES_KEY, 3);
        Objects.requireNonNull(property, "Configuration property bootstrap.servers not set");
        String[] split = property.split(",");
        ArrayList arrayList = new ArrayList();
        int intFromConfig2 = KafkaUtils.getIntFromConfig(properties, "socket.timeout.ms", 30000);
        int intFromConfig3 = KafkaUtils.getIntFromConfig(properties, "socket.receive.buffer.bytes", 65536);
        Random random = new Random();
        int i = 0;
        while (true) {
            if (i >= intFromConfig) {
                break;
            }
            int nextInt = random.nextInt(split.length);
            for (int i2 = 0; i2 < split.length; i2++) {
                String str = split[nextInt];
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(intFromConfig)});
                nextInt++;
                if (nextInt == split.length) {
                    nextInt = 0;
                }
                URL correctHostnamePort = NetUtils.getCorrectHostnamePort(str);
                SimpleConsumer simpleConsumer = null;
                try {
                    try {
                        simpleConsumer = new SimpleConsumer(correctHostnamePort.getHost(), correctHostnamePort.getPort(), intFromConfig2, intFromConfig3, "flink-kafka-consumer-partition-lookup");
                        List<TopicMetadata> list2 = simpleConsumer.send(new TopicMetadataRequest(list)).topicsMetadata();
                        arrayList.clear();
                    } catch (Exception e) {
                        LOG.warn("Error communicating with broker " + str + " to find partitions for " + list.toString() + "." + e.getClass() + ". Message: " + e.getMessage());
                        LOG.debug("Detailed trace", e);
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e2) {
                        }
                        if (simpleConsumer != null) {
                            simpleConsumer.close();
                        }
                    }
                    for (TopicMetadata topicMetadata : list2) {
                        if (topicMetadata.errorCode() != ErrorMapping.NoError()) {
                            LOG.warn("Error while getting metadata from broker " + str + " to find partitions for " + list.toString() + ". Error: " + ErrorMapping.exceptionFor(topicMetadata.errorCode()).getMessage());
                            if (simpleConsumer != null) {
                                simpleConsumer.close();
                            }
                        } else if (list.contains(topicMetadata.topic())) {
                            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                                arrayList.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition(topicMetadata.topic(), partitionMetadata.partitionId()), brokerToNode(partitionMetadata.leader())));
                            }
                        } else {
                            LOG.warn("Received metadata from topic " + topicMetadata.topic() + " even though it was not requested. Skipping ...");
                            if (simpleConsumer != null) {
                                simpleConsumer.close();
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Throwable th) {
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                    throw th;
                }
            }
            i++;
        }
        return arrayList;
    }

    private static Node brokerToNode(Broker broker) {
        return new Node(broker.id(), broker.host(), broker.port());
    }

    protected static void validateZooKeeperConfig(Properties properties) {
        if (properties.getProperty("zookeeper.connect") == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
        }
        if (properties.getProperty("group.id") == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
        }
        try {
            Integer.parseInt(properties.getProperty("zookeeper.session.timeout.ms", "0"));
            try {
                Integer.parseInt(properties.getProperty("zookeeper.connection.timeout.ms", "0"));
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
            }
        } catch (NumberFormatException e2) {
            throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
        }
    }
}
