package org.apache.flink.streaming.connectors.kafka;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaSource.class */
public class KafkaSource<OUT> extends ConnectorSource<OUT> {
    private static final long serialVersionUID = 1;
    private final String zkQuorum;
    private final String groupId;
    private final String topicId;
    private ConsumerConnector consumer;
    OUT outTuple;

    public KafkaSource(String str, String str2, String str3, DeserializationSchema<OUT> deserializationSchema) {
        super(deserializationSchema);
        this.zkQuorum = str;
        this.groupId = str2;
        this.topicId = str3;
    }

    private void initializeConnection() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zkQuorum);
        properties.put("group.id", this.groupId);
        properties.put("zookeeper.session.timeout.ms", "2000");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    public void invoke(Collector<OUT> collector) throws Exception {
        ConsumerIterator it = ((KafkaStream) ((List) this.consumer.createMessageStreams(Collections.singletonMap(this.topicId, 1)).get(this.topicId)).get(0)).iterator();
        while (it.hasNext()) {
            OUT deserialize = this.schema.deserialize((byte[]) it.next().message());
            if (this.schema.isEndOfStream(deserialize)) {
                break;
            } else {
                collector.collect(deserialize);
            }
        }
        this.consumer.shutdown();
    }

    public void open(Configuration configuration) {
        initializeConnection();
    }
}
