package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.class */
public class ZookeeperOffsetHandler implements OffsetHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
    private static final long OFFSET_NOT_SET = -915623761776L;
    private final ZkClient zkClient;
    private final String groupId;

    public ZookeeperOffsetHandler(Properties properties) {
        this.groupId = properties.getProperty("group.id");
        if (this.groupId == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set");
        }
        String property = properties.getProperty("zookeeper.connect");
        if (property == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
        }
        this.zkClient = new ZkClient(property, Integer.valueOf(properties.getProperty("zookeeper.session.timeout.ms", "6000")).intValue(), Integer.valueOf(properties.getProperty("zookeeper.connection.timeout.ms", "6000")).intValue(), new ZooKeeperStringSerializer());
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler
    public void commit(Map<TopicPartition, Long> map) {
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            long longValue = entry.getValue().longValue();
            if (longValue >= 0) {
                setOffsetInZooKeeper(this.zkClient, this.groupId, key.topic(), key.partition(), longValue);
            }
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler
    public void seekFetcherToInitialOffsets(List<TopicPartition> list, Fetcher fetcher) {
        for (TopicPartition topicPartition : list) {
            long offsetFromZooKeeper = getOffsetFromZooKeeper(this.zkClient, this.groupId, topicPartition.topic(), topicPartition.partition());
            if (offsetFromZooKeeper != -915623761776L) {
                LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.", Integer.valueOf(topicPartition.partition()), Long.valueOf(offsetFromZooKeeper));
                fetcher.seek(topicPartition, offsetFromZooKeeper + 1);
            }
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler
    public void close() throws IOException {
        this.zkClient.close();
    }

    public static void setOffsetInZooKeeper(ZkClient zkClient, String str, String str2, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str2, i);
        ZkUtils.updatePersistentPath(zkClient, new ZKGroupTopicDirs(str, topicAndPartition.topic()).consumerOffsetDir() + "/" + topicAndPartition.partition(), Long.toString(j));
    }

    public static long getOffsetFromZooKeeper(ZkClient zkClient, String str, String str2, int i) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str2, i);
        Tuple2 readDataMaybeNull = ZkUtils.readDataMaybeNull(zkClient, new ZKGroupTopicDirs(str, topicAndPartition.topic()).consumerOffsetDir() + "/" + topicAndPartition.partition());
        if (((Option) readDataMaybeNull._1()).isEmpty()) {
            return -915623761776L;
        }
        return Long.valueOf((String) ((Option) readDataMaybeNull._1()).get()).longValue();
    }
}
