package org.apache.flink.streaming.connectors.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaSource.class */
public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
    private static final long serialVersionUID = 1;
    private final String zkQuorum;
    private final String groupId;
    private final String topicId;
    private final int numThreads;
    private ConsumerConnector consumer;
    private boolean closeWithoutSend = false;
    private boolean sendAndClose = false;
    OUT outTuple;

    public KafkaSource(String str, String str2, String str3, int i) {
        this.zkQuorum = str;
        this.groupId = str2;
        this.topicId = str3;
        this.numThreads = i;
    }

    private void initializeConnection() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", this.zkQuorum);
        properties.put("group.id", this.groupId);
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    public void invoke(Collector<OUT> collector) throws Exception {
        initializeConnection();
        HashMap hashMap = new HashMap();
        hashMap.put(this.topicId, Integer.valueOf(this.numThreads));
        ConsumerIterator it = ((KafkaStream) ((List) this.consumer.createMessageStreams(hashMap).get(this.topicId)).get(0)).iterator();
        while (it.hasNext()) {
            OUT deserialize = deserialize((byte[]) it.next().message());
            if (this.closeWithoutSend) {
                break;
            }
            collector.collect(deserialize);
            if (this.sendAndClose) {
                break;
            }
        }
        this.consumer.shutdown();
    }

    public abstract OUT deserialize(byte[] bArr);

    public void closeWithoutSend() {
        this.closeWithoutSend = true;
    }

    public void sendAndClose() {
        this.sendAndClose = true;
    }
}
