/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internal;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kafka09.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka09.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.kafka09.shaded.org.apache.kafka.common.errors.WakeupException;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.util.Preconditions;

@Internal
public class Kafka09PartitionDiscoverer
extends AbstractPartitionDiscoverer {
    private final Properties kafkaProperties;
    private KafkaConsumer<?, ?> kafkaConsumer;

    public Kafka09PartitionDiscoverer(KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks, Properties kafkaProperties) {
        super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
        this.kafkaProperties = (Properties)Preconditions.checkNotNull((Object)kafkaProperties);
    }

    @Override
    protected void initializeConnections() {
        this.kafkaConsumer = new KafkaConsumer(this.kafkaProperties);
    }

    @Override
    protected List<String> getAllTopics() throws AbstractPartitionDiscoverer.WakeupException {
        try {
            return new ArrayList<String>(this.kafkaConsumer.listTopics().keySet());
        }
        catch (WakeupException e) {
            throw new AbstractPartitionDiscoverer.WakeupException();
        }
    }

    @Override
    protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) throws AbstractPartitionDiscoverer.WakeupException {
        LinkedList<KafkaTopicPartition> partitions = new LinkedList<KafkaTopicPartition>();
        try {
            for (String topic : topics) {
                for (PartitionInfo partitionInfo : this.kafkaConsumer.partitionsFor(topic)) {
                    partitions.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }
            }
        }
        catch (WakeupException e) {
            throw new AbstractPartitionDiscoverer.WakeupException();
        }
        return partitions;
    }

    @Override
    protected void wakeupConnections() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
        }
    }

    @Override
    protected void closeConnections() throws Exception {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
            this.kafkaConsumer = null;
        }
    }
}

