package org.apache.kylin.source.kafka.util;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.StreamingParser;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-source-kafka-1.5.2.1.jar:org/apache/kylin/source/kafka/util/KafkaUtils.class */
public final class KafkaUtils {
    private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class);
    private static final int MAX_RETRY_TIMES = 6;

    private KafkaUtils() {
    }

    public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int i) {
        PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), i, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
        if (partitionMetadata == null) {
            return null;
        }
        if (partitionMetadata.errorCode() != 0) {
            logger.warn("PartitionMetadata errorCode: " + ((int) partitionMetadata.errorCode()));
        }
        return partitionMetadata.leader();
    }

    private static void sleep(int i) {
        logger.info("retry times:" + i + " sleep:" + ((int) Math.pow(2.0d, i)) + " seconds");
        try {
            Thread.sleep(r0 * 1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int i, long j) {
        String topic = kafkaClusterConfig.getTopic();
        int i2 = 0;
        while (i2 < 6) {
            Broker leadBroker = getLeadBroker(kafkaClusterConfig, i);
            if (leadBroker == null) {
                logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + i);
                int i3 = i2;
                i2++;
                sleep(i3);
            } else {
                FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, i, j, leadBroker, kafkaClusterConfig);
                if (fetchResponse.errorCode(topic, i) != 0) {
                    logger.warn("errorCode of FetchResponse is:" + ((int) fetchResponse.errorCode(topic, i)));
                    int i4 = i2;
                    i2++;
                    sleep(i4);
                } else {
                    Iterator it2 = fetchResponse.messageSet(topic, i).iterator();
                    if (it2.hasNext()) {
                        return (MessageAndOffset) it2.next();
                    }
                    logger.warn("messageSet is empty");
                    int i5 = i2;
                    i2++;
                    sleep(i5);
                }
            }
        }
        throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, Integer.valueOf(i), Long.valueOf(j)));
    }

    public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int i, long j, StreamingParser streamingParser) {
        Pair<Long, Long> firstAndLastOffset = getFirstAndLastOffset(kafkaClusterConfig, i);
        String topic = kafkaClusterConfig.getTopic();
        logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, Integer.valueOf(i), Long.valueOf(j), firstAndLastOffset.getFirst(), firstAndLastOffset.getSecond()));
        long binarySearch = binarySearch(kafkaClusterConfig, i, firstAndLastOffset.getFirst().longValue(), firstAndLastOffset.getSecond().longValue(), j, streamingParser);
        logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, Integer.valueOf(i), Long.valueOf(binarySearch)));
        return binarySearch;
    }

    public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int i) {
        String topic = kafkaClusterConfig.getTopic();
        Broker broker = (Broker) Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, i), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + i);
        return Pair.newPair(Long.valueOf(KafkaRequester.getLastOffset(topic, i, OffsetRequest.EarliestTime(), broker, kafkaClusterConfig)), Long.valueOf(KafkaRequester.getLastOffset(topic, i, OffsetRequest.LatestTime(), broker, kafkaClusterConfig) - 1));
    }

    private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int i, long j, long j2, long j3, StreamingParser streamingParser) {
        HashMap newHashMap = Maps.newHashMap();
        while (j < j2) {
            long j4 = j + ((j2 - j) >> 1);
            long dataTimestamp = getDataTimestamp(kafkaClusterConfig, i, j, streamingParser, newHashMap);
            long dataTimestamp2 = getDataTimestamp(kafkaClusterConfig, i, j2, streamingParser, newHashMap);
            long dataTimestamp3 = getDataTimestamp(kafkaClusterConfig, i, j4, streamingParser, newHashMap);
            if (dataTimestamp >= j3) {
                return j;
            }
            if (dataTimestamp2 <= j3) {
                return j2;
            }
            if (j3 == dataTimestamp3) {
                return j4;
            }
            if (j3 < dataTimestamp3) {
                j2 = j4 - 1;
            } else {
                j = j4 + 1;
            }
        }
        return j;
    }

    private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int i, long j, StreamingParser streamingParser, Map<Long, Long> map) {
        if (map.containsKey(Long.valueOf(j))) {
            return map.get(Long.valueOf(j)).longValue();
        }
        long dataTimestamp = getDataTimestamp(kafkaClusterConfig, i, j, streamingParser);
        map.put(Long.valueOf(j), Long.valueOf(dataTimestamp));
        return dataTimestamp;
    }

    public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int i, long j, StreamingParser streamingParser) {
        String topic = kafkaClusterConfig.getTopic();
        MessageAndOffset kafkaMessage = getKafkaMessage(kafkaClusterConfig, i, j);
        ByteBuffer payload = kafkaMessage.message().payload();
        payload.get(new byte[payload.limit()]);
        StreamingMessage parse = streamingParser.parse(kafkaMessage);
        logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(parse.getTimestamp())));
        return parse.getTimestamp();
    }
}
