package org.apache.flink.streaming.connectors.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka011.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.class */
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = 2324564345203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    public static final long DEFAULT_POLL_TIMEOUT = 100;
    protected final Properties properties;
    protected final long pollTimeout;
    private FlinkConnectorRateLimiter rateLimiter;

    public FlinkKafkaConsumer09(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), deserializationSchema, properties);
    }

    public FlinkKafkaConsumer09(String str, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), kafkaDeserializationSchema, properties);
    }

    public FlinkKafkaConsumer09(List<String> list, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(list, new KafkaDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKafkaConsumer09(List<String> list, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        this(list, null, kafkaDeserializationSchema, properties);
    }

    @PublicEvolving
    public FlinkKafkaConsumer09(Pattern pattern, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(pattern, new KafkaDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    @PublicEvolving
    public FlinkKafkaConsumer09(Pattern pattern, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        this(null, pattern, kafkaDeserializationSchema, properties);
    }

    private FlinkKafkaConsumer09(List<String> list, Pattern pattern, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        super(list, pattern, kafkaDeserializationSchema, PropertiesUtil.getLong((Properties) Preconditions.checkNotNull(properties, "props"), FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, Long.MIN_VALUE), !PropertiesUtil.getBoolean(properties, "flink.disable-metrics", false));
        this.properties = properties;
        setDeserializer(this.properties);
        try {
            if (this.properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(this.properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = 100L;
            }
        } catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for 'flink.poll-timeout'", e);
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
        adjustAutoCommitConfig(this.properties, offsetCommitMode);
        if (this.rateLimiter != null) {
            this.rateLimiter.open(streamingRuntimeContext);
        }
        return new Kafka09Fetcher(sourceContext, map, serializedValue, serializedValue2, streamingRuntimeContext.getProcessingTimeService(), streamingRuntimeContext.getExecutionConfig().getAutoWatermarkInterval(), streamingRuntimeContext.getUserCodeClassLoader(), streamingRuntimeContext.getTaskNameWithSubtasks(), this.deserializer, this.properties, this.pollTimeout, streamingRuntimeContext.getMetricGroup(), metricGroup, z, this.rateLimiter);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
    protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2) {
        return new Kafka09PartitionDiscoverer(kafkaTopicsDescriptor, i, i2, this.properties);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
    protected boolean getIsAutoCommitEnabled() {
        return PropertiesUtil.getBoolean(this.properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && PropertiesUtil.getLong(this.properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000L) > 0;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
    protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j) {
        throw new UnsupportedOperationException("Fetching partition offsets using timestamps is only supported in Kafka versions 0.10 and above.");
    }

    private static void setDeserializer(Properties properties) {
        String name = ByteArrayDeserializer.class.getName();
        Object obj = properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object obj2 = properties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        if (obj != null && !obj.equals(name)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (obj2 != null && !obj2.equals(name)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, name);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, name);
    }

    public void setRateLimiter(FlinkConnectorRateLimiter flinkConnectorRateLimiter) {
        this.rateLimiter = flinkConnectorRateLimiter;
    }

    public FlinkConnectorRateLimiter getRateLimiter() {
        return this.rateLimiter;
    }
}
