/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkKafkaConsumer09<T>
extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = 2324564345203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    public static final long DEFAULT_POLL_TIMEOUT = 100L;
    protected final Properties properties;
    protected final long pollTimeout;

    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }

    public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }

    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, (KeyedDeserializationSchema<T>)new KeyedDeserializationSchemaWrapper(deserializer), props);
    }

    public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
        super(topics, deserializer);
        this.properties = (Properties)Preconditions.checkNotNull((Object)props, (String)"props");
        FlinkKafkaConsumer09.setDeserializer(this.properties);
        try {
            this.pollTimeout = this.properties.containsKey(KEY_POLL_TIMEOUT) ? Long.parseLong(this.properties.getProperty(KEY_POLL_TIMEOUT)) : 100L;
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for 'flink.poll-timeout'", e);
        }
    }

    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
        boolean useMetrics = Boolean.valueOf(this.properties.getProperty("flink.disable-metrics", "false")) == false;
        return new Kafka09Fetcher<T>(sourceContext, thisSubtaskPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.isCheckpointingEnabled(), runtimeContext.getTaskNameWithSubtasks(), runtimeContext.getMetricGroup(), this.deserializer, this.properties, this.pollTimeout, useMetrics);
    }

    protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
        ArrayList<KafkaTopicPartition> partitions = new ArrayList<KafkaTopicPartition>();
        try (KafkaConsumer consumer = new KafkaConsumer(this.properties);){
            for (String topic : topics) {
                List partitionsForTopic = consumer.partitionsFor(topic);
                if (partitionsForTopic != null) {
                    partitions.addAll(FlinkKafkaConsumer09.convertToFlinkKafkaTopicPartition(partitionsForTopic));
                    continue;
                }
                LOG.info("Unable to retrieve any partitions for the requested topic: {}", (Object)topic);
            }
        }
        if (partitions.isEmpty()) {
            throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
        }
        LOG.info("Got {} partitions from these topics: {}", (Object)partitions.size(), topics);
        if (LOG.isInfoEnabled()) {
            FlinkKafkaConsumer09.logPartitionInfo((Logger)LOG, partitions);
        }
        return partitions;
    }

    private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
        Preconditions.checkNotNull(partitions);
        ArrayList<KafkaTopicPartition> ret = new ArrayList<KafkaTopicPartition>(partitions.size());
        for (PartitionInfo pi : partitions) {
            ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
        }
        return ret;
    }

    private static void setDeserializer(Properties props) {
        String deSerName = ByteArrayDeserializer.class.getCanonicalName();
        Object keyDeSer = props.get("key.deserializer");
        Object valDeSer = props.get("value.deserializer");
        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", (Object)"key.deserializer");
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", (Object)"value.deserializer");
        }
        props.put("key.deserializer", deSerName);
        props.put("value.deserializer", deSerName);
    }
}

