package org.apache.flink.streaming.connectors.kafka.api.simple;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaMultiplePartitionsIterator;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.class */
public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
    private final String topicId;
    private final String zookeeperServerAddress;
    private final int zookeeperSyncTimeMillis;
    private final int waitOnEmptyFetchMillis;
    private final KafkaOffset startingOffset;
    private int connectTimeoutMs;
    private int bufferSize;
    private transient KafkaConsumerIterator iterator;
    private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
    private transient Map<Integer, KafkaOffset> partitions;

    public PersistentKafkaSource(String str, String str2, DeserializationSchema<OUT> deserializationSchema) {
        this(str, str2, deserializationSchema, 10000, 100);
    }

    public PersistentKafkaSource(String str, String str2, DeserializationSchema<OUT> deserializationSchema, int i, int i2) {
        this(str, str2, deserializationSchema, i, i2, Offset.FROM_CURRENT);
    }

    public PersistentKafkaSource(String str, String str2, DeserializationSchema<OUT> deserializationSchema, int i, int i2, Offset offset) {
        super(deserializationSchema);
        this.connectTimeoutMs = 100000;
        this.bufferSize = 65536;
        Preconditions.checkNotNull(str, "The Zookeeper address can not be null");
        Preconditions.checkNotNull(str2, "The topic id can not be null");
        Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
        Preconditions.checkArgument(i > 0, "The sync time must be positive");
        Preconditions.checkArgument(i2 > 0, "The wait time must be positive");
        this.topicId = str2;
        this.zookeeperServerAddress = str;
        switch (offset) {
            case FROM_BEGINNING:
                this.startingOffset = new BeginningOffset();
                break;
            case FROM_CURRENT:
                this.startingOffset = new CurrentOffset();
                break;
            default:
                this.startingOffset = new CurrentOffset();
                break;
        }
        this.zookeeperSyncTimeMillis = i;
        this.waitOnEmptyFetchMillis = i2;
    }

    public void open(Configuration configuration) throws InterruptedException {
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(this.zookeeperServerAddress, this.zookeeperSyncTimeMillis, this.zookeeperSyncTimeMillis);
        int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(this.topicId);
        String leaderBrokerAddressForTopic = kafkaTopicUtils.getLeaderBrokerAddressForTopic(this.topicId);
        if (indexOfThisSubtask >= numberOfPartitions) {
            this.iterator = new KafkaIdleConsumerIterator();
        } else {
            if (runtimeContext.containsState("kafka")) {
                this.kafkaOffSet = runtimeContext.getState("kafka");
                this.partitions = (Map) this.kafkaOffSet.getState();
            } else {
                this.partitions = new HashMap();
                int i = indexOfThisSubtask;
                while (true) {
                    int i2 = i;
                    if (i2 >= numberOfPartitions) {
                        break;
                    }
                    this.partitions.put(Integer.valueOf(i2), this.startingOffset);
                    i = i2 + numberOfParallelSubtasks;
                }
                this.kafkaOffSet = new OperatorState<>(this.partitions);
                runtimeContext.registerState("kafka", this.kafkaOffSet);
            }
            this.iterator = getMultiKafkaIterator(leaderBrokerAddressForTopic, this.topicId, this.partitions, this.waitOnEmptyFetchMillis);
            if (LOG.isInfoEnabled()) {
                LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.", new Object[]{Integer.valueOf(indexOfThisSubtask + 1), Integer.valueOf(numberOfParallelSubtasks), this.partitions.keySet(), this.topicId});
            }
        }
        this.iterator.initialize();
    }

    protected KafkaConsumerIterator getMultiKafkaIterator(String str, String str2, Map<Integer, KafkaOffset> map, int i) {
        return new KafkaMultiplePartitionsIterator(str, str2, map, i, this.connectTimeoutMs, this.bufferSize);
    }

    public void run(Collector<OUT> collector) throws Exception {
        while (this.iterator.hasNext()) {
            MessageWithMetadata nextWithOffset = this.iterator.nextWithOffset();
            Object deserialize = this.schema.deserialize(nextWithOffset.getMessage());
            if (this.schema.isEndOfStream(deserialize)) {
                return;
            }
            collector.collect(deserialize);
            this.partitions.put(Integer.valueOf(nextWithOffset.getPartition()), new GivenOffset(nextWithOffset.getOffset()));
            this.kafkaOffSet.update(this.partitions);
        }
    }

    public void setConnectTimeoutMs(int i) {
        Preconditions.checkArgument(i > 0, "The timeout must be positive");
        this.connectTimeoutMs = i;
    }

    public void setBufferSize(int i) {
        Preconditions.checkArgument(this.connectTimeoutMs > 0, "The buffer size must be positive");
        this.bufferSize = i;
    }

    public void cancel() {
    }
}
