/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;

@PublicEvolving
public class FlinkKafkaConsumer08<T>
extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    private final Properties kafkaProperties;

    public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }

    public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }

    public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, (KeyedDeserializationSchema<T>)new KeyedDeserializationSchemaWrapper(deserializer), props);
    }

    public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(topics, null, deserializer, props);
    }

    @PublicEvolving
    public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(subscriptionPattern, (KeyedDeserializationSchema<T>)new KeyedDeserializationSchemaWrapper(valueDeserializer), props);
    }

    @PublicEvolving
    public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(null, subscriptionPattern, deserializer, props);
    }

    private FlinkKafkaConsumer08(List<String> topics, Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
        super(topics, subscriptionPattern, deserializer, PropertiesUtil.getLong((Properties)((Properties)Preconditions.checkNotNull((Object)props, (String)"props")), (String)"flink.partition-discovery.interval-millis", (long)Long.MIN_VALUE), !PropertiesUtil.getBoolean((Properties)props, (String)"flink.disable-metrics", (boolean)false));
        this.kafkaProperties = props;
        FlinkKafkaConsumer08.validateZooKeeperConfig(props);
        FlinkKafkaConsumer08.validateAutoOffsetResetValue(props);
    }

    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
        long autoCommitInterval = offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC ? PropertiesUtil.getLong((Properties)this.kafkaProperties, (String)"auto.commit.interval.ms", (long)60000L) : -1L;
        return new Kafka08Fetcher<T>(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, this.deserializer, this.kafkaProperties, autoCommitInterval, consumerMetricGroup, useMetrics);
    }

    protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks) {
        return new Kafka08PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, this.kafkaProperties);
    }

    protected boolean getIsAutoCommitEnabled() {
        return PropertiesUtil.getBoolean((Properties)this.kafkaProperties, (String)"auto.commit.enable", (boolean)true) && PropertiesUtil.getLong((Properties)this.kafkaProperties, (String)"auto.commit.interval.ms", (long)60000L) > 0L;
    }

    protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> partitions, long timestamp) {
        throw new UnsupportedOperationException("Fetching partition offsets using timestamps is only supported in Kafka versions 0.10 and above.");
    }

    protected static void validateZooKeeperConfig(Properties props) {
        if (props.getProperty("zookeeper.connect") == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
        }
        if (props.getProperty("group.id") == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
        }
        try {
            Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
        }
        try {
            Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
        }
    }

    private static void validateAutoOffsetResetValue(Properties config) {
        String val = config.getProperty("auto.offset.reset", "largest");
        if (!(val.equals("largest") || val.equals("latest") || val.equals("earliest") || val.equals("smallest"))) {
            throw new IllegalArgumentException("Cannot use 'auto.offset.reset' value '" + val + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'.");
        }
    }
}

