package org.apache.flink.streaming.connectors.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.class */
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = 2324564345203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class);
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    public static final long DEFAULT_POLL_TIMEOUT = 100;
    protected final Properties properties;
    protected final long pollTimeout;
    private FlinkConnectorRateLimiter rateLimiter;

    public FlinkKafkaConsumer010(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), deserializationSchema, properties);
    }

    public FlinkKafkaConsumer010(String str, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), kafkaDeserializationSchema, properties);
    }

    public FlinkKafkaConsumer010(List<String> list, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(list, (KafkaDeserializationSchema) new KafkaDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKafkaConsumer010(List<String> list, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        this(list, null, kafkaDeserializationSchema, properties);
    }

    @PublicEvolving
    public FlinkKafkaConsumer010(Pattern pattern, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(pattern, (KafkaDeserializationSchema) new KafkaDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    @PublicEvolving
    public FlinkKafkaConsumer010(Pattern pattern, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        this(null, pattern, kafkaDeserializationSchema, properties);
    }

    private FlinkKafkaConsumer010(@Nullable List<String> list, @Nullable Pattern pattern, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        super(list, pattern, kafkaDeserializationSchema, PropertiesUtil.getLong((Properties) Preconditions.checkNotNull(properties, "props"), "flink.partition-discovery.interval-millis", Long.MIN_VALUE), !PropertiesUtil.getBoolean(properties, "flink.disable-metrics", false));
        this.properties = properties;
        setDeserializer(this.properties);
        try {
            if (this.properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(this.properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = 100L;
            }
        } catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for 'flink.poll-timeout'", e);
        }
    }

    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
        adjustAutoCommitConfig(this.properties, offsetCommitMode);
        if (this.rateLimiter != null) {
            this.rateLimiter.open(streamingRuntimeContext);
        }
        return new Kafka010Fetcher(sourceContext, map, serializedValue, streamingRuntimeContext.getProcessingTimeService(), streamingRuntimeContext.getExecutionConfig().getAutoWatermarkInterval(), streamingRuntimeContext.getUserCodeClassLoader(), streamingRuntimeContext.getTaskNameWithSubtasks(), this.deserializer, this.properties, this.pollTimeout, streamingRuntimeContext.getMetricGroup(), metricGroup, z, this.rateLimiter);
    }

    protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2) {
        return new Kafka010PartitionDiscoverer(kafkaTopicsDescriptor, i, i2, this.properties);
    }

    protected boolean getIsAutoCommitEnabled() {
        return PropertiesUtil.getBoolean(this.properties, "enable.auto.commit", true) && PropertiesUtil.getLong(this.properties, "auto.commit.interval.ms", 5000L) > 0;
    }

    protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j) {
        HashMap hashMap = new HashMap(collection.size());
        for (KafkaTopicPartition kafkaTopicPartition : collection) {
            hashMap.put(new TopicPartition(kafkaTopicPartition.getTopic(), kafkaTopicPartition.getPartition()), Long.valueOf(j));
        }
        HashMap hashMap2 = new HashMap(collection.size());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties);
        Throwable th = null;
        try {
            try {
                for (Map.Entry entry : kafkaConsumer.offsetsForTimes(hashMap).entrySet()) {
                    hashMap2.put(new KafkaTopicPartition(((TopicPartition) entry.getKey()).topic(), ((TopicPartition) entry.getKey()).partition()), entry.getValue() == null ? null : Long.valueOf(((OffsetAndTimestamp) entry.getValue()).offset()));
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return hashMap2;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private static void setDeserializer(Properties properties) {
        String name = ByteArrayDeserializer.class.getName();
        Object obj = properties.get("key.deserializer");
        Object obj2 = properties.get("value.deserializer");
        if (obj != null && !obj.equals(name)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", "key.deserializer");
        }
        if (obj2 != null && !obj2.equals(name)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", "value.deserializer");
        }
        properties.put("key.deserializer", name);
        properties.put("value.deserializer", name);
    }

    public void setRateLimiter(FlinkConnectorRateLimiter flinkConnectorRateLimiter) {
        this.rateLimiter = flinkConnectorRateLimiter;
    }

    public FlinkConnectorRateLimiter getRateLimiter() {
        return this.rateLimiter;
    }
}
