/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.utils.ZKGroupTopicDirs;
import org.apache.flink.shaded.org.apache.curator.RetryPolicy;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperOffsetHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
    private final String groupId;
    private final CuratorFramework curatorClient;

    public ZookeeperOffsetHandler(Properties props) {
        this.groupId = props.getProperty("group.id");
        if (this.groupId == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set");
        }
        String zkConnect = props.getProperty("zookeeper.connect");
        if (zkConnect == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
        }
        int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
        int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
        int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
        int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
        this.curatorClient = CuratorFrameworkFactory.newClient((String)zkConnect, (int)sessionTimeoutMs, (int)connectionTimeoutMs, (RetryPolicy)retryPolicy);
        this.curatorClient.start();
    }

    public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) throws Exception {
        for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToWrite.entrySet()) {
            KafkaTopicPartition tp = entry.getKey();
            long offset = entry.getValue();
            if (offset < 0L) continue;
            ZookeeperOffsetHandler.setOffsetInZooKeeper(this.curatorClient, this.groupId, tp.getTopic(), tp.getPartition(), offset);
        }
    }

    public Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception {
        HashMap<KafkaTopicPartition, Long> ret = new HashMap<KafkaTopicPartition, Long>(partitions.size());
        for (KafkaTopicPartition tp : partitions) {
            Long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(this.curatorClient, this.groupId, tp.getTopic(), tp.getPartition());
            if (offset == null) continue;
            LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.", new Object[]{tp.getTopic(), tp.getPartition(), offset});
            ret.put(tp, offset);
        }
        return ret;
    }

    public void close() throws IOException {
        this.curatorClient.close();
    }

    public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
        ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
        String path = topicDirs.consumerOffsetDir() + "/" + partition;
        curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
        byte[] data = Long.toString(offset).getBytes();
        curatorClient.setData().forPath(path, data);
    }

    public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
        ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
        String path = topicDirs.consumerOffsetDir() + "/" + partition;
        curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
        byte[] data = (byte[])curatorClient.getData().forPath(path);
        if (data == null) {
            return null;
        }
        String asString = new String(data);
        if (asString.length() == 0) {
            return null;
        }
        try {
            return Long.valueOf(asString);
        }
        catch (NumberFormatException e) {
            LOG.error("The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}", new Object[]{groupId, topic, partition, asString});
            return null;
        }
    }
}

