package org.apache.flink.streaming.connectors.kafka.api.persistent;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.common.TopicAndPartition;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.Seq;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.class */
public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>, CheckpointCommitter, CheckpointedAsynchronously<long[]> {
    private static final long serialVersionUID = 287845877188312621L;
    private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
    private final String topicName;
    private final DeserializationSchema<OUT> deserializationSchema;
    private final LinkedMap pendingCheckpoints = new LinkedMap();
    private transient ConsumerConfig consumerConfig;
    private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
    private transient ConsumerConnector consumer;
    private transient ZkClient zkClient;
    private transient long[] lastOffsets;
    private transient long[] commitedOffsets;
    private transient long[] restoreState;
    private volatile boolean running;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource$KafkaZKStringSerializer.class */
    public static class KafkaZKStringSerializer implements ZkSerializer {
        public byte[] serialize(Object obj) throws ZkMarshallingError {
            try {
                return ((String) obj).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }

        public Object deserialize(byte[] bArr) throws ZkMarshallingError {
            if (bArr == null) {
                return null;
            }
            try {
                return new String(bArr, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public PersistentKafkaSource(String str, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(deserializationSchema);
        Preconditions.checkNotNull(consumerConfig);
        this.topicName = str;
        this.deserializationSchema = deserializationSchema;
        this.consumerConfig = consumerConfig;
        if (consumerConfig.autoCommitEnable()) {
            throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. This source can only be used with auto commit disabled because the source is committing to zookeeper by itself (not using the KafkaConsumer).");
        }
        if (!consumerConfig.offsetsStorage().equals("zookeeper")) {
            throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(this.consumerConfig);
        Map createMessageStreams = createJavaConsumerConnector.createMessageStreams(Collections.singletonMap(this.topicName, 1));
        if (createMessageStreams.size() != 1) {
            throw new RuntimeException("Expected only one message stream but got " + createMessageStreams.size());
        }
        List list = (List) createMessageStreams.get(this.topicName);
        if (list == null) {
            throw new RuntimeException("Requested stream not available. Available streams: " + createMessageStreams.toString());
        }
        if (list.size() != 1) {
            throw new RuntimeException("Requested 1 stream from Kafka, bot got " + list.size() + " streams");
        }
        LOG.info("Opening Consumer instance for topic '{}' on group '{}'", this.topicName, this.consumerConfig.groupId());
        this.iteratorToRead = ((KafkaStream) list.get(0)).iterator();
        this.consumer = createJavaConsumerConnector;
        this.zkClient = new ZkClient(this.consumerConfig.zkConnect(), this.consumerConfig.zkSessionTimeoutMs(), this.consumerConfig.zkConnectionTimeoutMs(), new KafkaZKStringSerializer());
        int numberOfPartitions = getNumberOfPartitions();
        LOG.debug("The topic {} has {} partitions", this.topicName, Integer.valueOf(numberOfPartitions));
        this.lastOffsets = new long[numberOfPartitions];
        this.commitedOffsets = new long[numberOfPartitions];
        if (this.restoreState == null) {
            Arrays.fill(this.lastOffsets, -1L);
        } else {
            if (this.restoreState.length != numberOfPartitions) {
                throw new IllegalStateException("There are " + this.restoreState.length + " offsets to restore for topic " + this.topicName + " but there are only " + numberOfPartitions + " in the topic");
            }
            LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(this.restoreState));
            setOffsetsInZooKeeper(this.restoreState);
            this.lastOffsets = this.restoreState;
        }
        Arrays.fill(this.commitedOffsets, 0L);
        this.pendingCheckpoints.clear();
        this.running = true;
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        if (this.iteratorToRead == null) {
            throw new IllegalStateException("Kafka iterator not initialized properly.");
        }
        Object checkpointLock = sourceContext.getCheckpointLock();
        while (this.running && this.iteratorToRead.hasNext()) {
            MessageAndMetadata next = this.iteratorToRead.next();
            if (this.lastOffsets[next.partition()] >= next.offset()) {
                LOG.info("Skipping message with offset {} from partition {}", Long.valueOf(next.offset()), Integer.valueOf(next.partition()));
            } else {
                Object deserialize = this.deserializationSchema.deserialize((byte[]) next.message());
                if (this.deserializationSchema.isEndOfStream(deserialize)) {
                    LOG.info("DeserializationSchema signaled end of stream for this source");
                    return;
                }
                synchronized (checkpointLock) {
                    this.lastOffsets[next.partition()] = next.offset();
                    sourceContext.collect(deserialize);
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processed record with offset {} from partition {}", Long.valueOf(next.offset()), Integer.valueOf(next.partition()));
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void close() {
        LOG.info("Closing Kafka consumer");
        this.consumer.shutdown();
        this.zkClient.close();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public long[] m203snapshotState(long j, long j2) throws Exception {
        if (this.lastOffsets == null) {
            LOG.warn("State snapshot requested on not yet opened source. Returning null");
            return null;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", new Object[]{Arrays.toString(this.lastOffsets), Long.valueOf(j), Long.valueOf(j2)});
        }
        long[] copyOf = Arrays.copyOf(this.lastOffsets, this.lastOffsets.length);
        synchronized (this.pendingCheckpoints) {
            this.pendingCheckpoints.put(Long.valueOf(j), copyOf);
        }
        return copyOf;
    }

    public void restoreState(long[] jArr) {
        LOG.info("The state will be restored to {} in the open() method", Arrays.toString(jArr));
        this.restoreState = Arrays.copyOf(jArr, jArr.length);
    }

    public void commitCheckpoint(long j) {
        LOG.info("Commit checkpoint {}", Long.valueOf(j));
        synchronized (this.pendingCheckpoints) {
            int indexOf = this.pendingCheckpoints.indexOf(Long.valueOf(j));
            if (indexOf == -1) {
                LOG.warn("Unable to find pending checkpoint for id {}", Long.valueOf(j));
                return;
            }
            long[] jArr = (long[]) this.pendingCheckpoints.remove(indexOf);
            if (!this.pendingCheckpoints.isEmpty()) {
                for (int i = 0; i < indexOf; i++) {
                    this.pendingCheckpoints.remove(0);
                }
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(jArr));
            }
            setOffsetsInZooKeeper(jArr);
        }
    }

    private void setOffsetsInZooKeeper(long[] jArr) {
        for (int i = 0; i < jArr.length; i++) {
            long j = jArr[i];
            if (j != -1) {
                setOffset(i, j);
            }
        }
    }

    private int getNumberOfPartitions() {
        scala.collection.mutable.Map partitionsForTopics = ZkUtils.getPartitionsForTopics(this.zkClient, JavaConversions.asScalaBuffer(Collections.singletonList(this.topicName)).toList());
        Option option = partitionsForTopics.get(this.topicName);
        if (option.isEmpty()) {
            throw new IllegalArgumentException("Unable to get number of partitions for topic " + this.topicName + " from " + partitionsForTopics.toString());
        }
        return ((Seq) option.get()).size();
    }

    protected void setOffset(int i, long j) {
        if (this.commitedOffsets[i] >= j) {
            LOG.debug("Ignoring offset {} for partition {} because it is already committed", Long.valueOf(j), Integer.valueOf(i));
        } else {
            setOffset(this.zkClient, this.consumerConfig.groupId(), this.topicName, i, j);
            this.commitedOffsets[i] = j;
        }
    }

    public static void setOffset(ZkClient zkClient, String str, String str2, int i, long j) {
        LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", new Object[]{Integer.valueOf(i), str2, str, Long.valueOf(j)});
        TopicAndPartition topicAndPartition = new TopicAndPartition(str2, i);
        ZkUtils.updatePersistentPath(zkClient, new ZKGroupTopicDirs(str, topicAndPartition.topic()).consumerOffsetDir() + "/" + topicAndPartition.partition(), Long.toString(j));
    }

    public static long getOffset(ZkClient zkClient, String str, String str2, int i) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str2, i);
        return Long.valueOf((String) ZkUtils.readData(zkClient, new ZKGroupTopicDirs(str, topicAndPartition.topic()).consumerOffsetDir() + "/" + topicAndPartition.partition())._1()).longValue();
    }

    private void writeObject(ObjectOutputStream objectOutputStream) throws IOException, ClassNotFoundException {
        objectOutputStream.defaultWriteObject();
        objectOutputStream.writeObject(this.consumerConfig.props().props());
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.consumerConfig = new ConsumerConfig((Properties) objectInputStream.readObject());
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }
}
