package org.apache.flink.streaming.connectors.kafka.api;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.source.ConnectorSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/KafkaSource.class */
public class KafkaSource<OUT> extends ConnectorSource<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private static final String DEFAULT_GROUP_ID = "flink-group";
    private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
    private final String zookeeperAddress;
    private final String groupId;
    private final String topicId;
    private final Properties customProperties;
    private final long zookeeperSyncTimeMillis;
    private transient ConsumerConnector consumer;
    private transient ConsumerIterator<byte[], byte[]> consumerIterator;
    private volatile boolean isRunning;

    public KafkaSource(String str, String str2, String str3, DeserializationSchema<OUT> deserializationSchema, long j) {
        this(str, str2, str3, deserializationSchema, j, null);
    }

    public KafkaSource(String str, String str2, String str3, DeserializationSchema<OUT> deserializationSchema, long j, Properties properties) {
        super(deserializationSchema);
        Preconditions.checkNotNull(str, "ZK address is null");
        Preconditions.checkNotNull(str2, "Topic ID is null");
        Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
        Preconditions.checkArgument(j >= 0, "The ZK sync time must be positive");
        this.zookeeperAddress = str;
        this.groupId = str3;
        this.topicId = str2;
        this.zookeeperSyncTimeMillis = j;
        this.customProperties = properties;
    }

    public KafkaSource(String str, String str2, DeserializationSchema<OUT> deserializationSchema, long j) {
        this(str, str2, DEFAULT_GROUP_ID, deserializationSchema, j, null);
    }

    public KafkaSource(String str, String str2, DeserializationSchema<OUT> deserializationSchema) {
        this(str, str2, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
    }

    private void initializeConnection() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zookeeperAddress);
        properties.put("group.id", this.groupId);
        properties.put("zookeeper.session.timeout.ms", "10000");
        properties.put("zookeeper.sync.time.ms", Long.toString(this.zookeeperSyncTimeMillis));
        properties.put("auto.commit.interval.ms", "1000");
        if (this.customProperties != null) {
            for (Map.Entry entry : properties.entrySet()) {
                if (properties.contains(entry.getKey())) {
                    LOG.warn("Overwriting property " + entry.getKey() + " with value " + entry.getValue());
                }
                properties.put(entry.getKey(), entry.getValue());
            }
        }
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        KafkaStream kafkaStream = (KafkaStream) ((List) this.consumer.createMessageStreams(Collections.singletonMap(this.topicId, 1)).get(this.topicId)).get(0);
        this.consumer.commitOffsets();
        this.consumerIterator = kafkaStream.iterator();
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        while (this.isRunning && this.consumerIterator.hasNext()) {
            try {
                Object deserialize = this.schema.deserialize((byte[]) this.consumerIterator.next().message());
                if (this.schema.isEndOfStream(deserialize)) {
                    break;
                } else {
                    sourceContext.collect(deserialize);
                }
            } finally {
                this.consumer.shutdown();
            }
        }
    }

    public void open(Configuration configuration) throws Exception {
        initializeConnection();
        this.isRunning = true;
    }

    public void cancel() {
        this.isRunning = false;
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
    }
}
