package org.apache.hudi.utilities.sources.helpers;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.class */
public class KafkaOffsetGen {
    private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
    private final Pattern pattern = Pattern.compile(".*,.*:.*");
    private final Map<String, Object> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;
    private KafkaResetOffsetStrategies autoResetValue;
    private final String kafkaCheckpointType;

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen$CheckpointUtils.class */
    public static class CheckpointUtils {
        public static Map<TopicPartition, Long> strToOffsets(String str) {
            HashMap hashMap = new HashMap();
            String[] split = str.split(",");
            String str2 = split[0];
            for (int i = 1; i < split.length; i++) {
                String[] split2 = split[i].split(":");
                hashMap.put(new TopicPartition(str2, Integer.parseInt(split2[0])), Long.valueOf(Long.parseLong(split2[1])));
            }
            return hashMap;
        }

        public static String offsetsToStr(OffsetRange[] offsetRangeArr) {
            StringBuilder sb = new StringBuilder();
            sb.append(offsetRangeArr[0].topic() + ",");
            sb.append((String) Arrays.stream(offsetRangeArr).map(offsetRange -> {
                return String.format("%s:%d", Integer.valueOf(offsetRange.partition()), Long.valueOf(offsetRange.untilOffset()));
            }).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2, long j) {
            OffsetRange[] offsetRangeArr = (OffsetRange[]) ((List) map2.keySet().stream().map(topicPartition -> {
                long longValue = ((Long) map.getOrDefault(topicPartition, 0L)).longValue();
                return OffsetRange.create(topicPartition, longValue, longValue);
            }).sorted(Comparator.comparing((v0) -> {
                return v0.partition();
            })).collect(Collectors.toList())).toArray(new OffsetRange[map2.size()]);
            long j2 = 0;
            HashSet hashSet = new HashSet();
            while (j2 < j && hashSet.size() < map2.size()) {
                long ceil = (long) Math.ceil((1.0d * (j - j2)) / (map2.size() - hashSet.size()));
                for (int i = 0; i < offsetRangeArr.length; i++) {
                    OffsetRange offsetRange = offsetRangeArr[i];
                    if (!hashSet.contains(Integer.valueOf(offsetRange.partition()))) {
                        long longValue = map2.get(offsetRange.topicPartition()).longValue();
                        long min = Math.min(longValue, offsetRange.untilOffset() + ceil);
                        if (min == longValue) {
                            hashSet.add(Integer.valueOf(offsetRange.partition()));
                        }
                        j2 += min - offsetRange.untilOffset();
                        if (j2 > j) {
                            min = Math.min(longValue, min + Math.min(ceil, j - j2));
                        }
                        offsetRangeArr[i] = OffsetRange.create(offsetRange.topicPartition(), offsetRange.fromOffset(), min);
                    }
                }
            }
            return offsetRangeArr;
        }

        public static long totalNewMessages(OffsetRange[] offsetRangeArr) {
            return Arrays.stream(offsetRangeArr).mapToLong((v0) -> {
                return v0.count();
            }).sum();
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen$Config.class */
    public static class Config {
        private static final ConfigProperty<String> KAFKA_TOPIC_NAME = ConfigProperty.key(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP).noDefaultValue().withDocumentation("Kafka topic name.");
        public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty.key("hoodie.deltastreamer.source.kafka.checkpoint.type").defaultValue(HiveSchemaUtil.STRING_TYPE_NAME).withDocumentation("Kafka checkpoint type.");
        public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT = ConfigProperty.key("hoodie.deltastreamer.source.kafka.fetch_partition.time.out").defaultValue(300000L).withDocumentation("Time out for fetching partitions. 5min by default");
        public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty.key("hoodie.deltastreamer.source.kafka.enable.commit.offset").defaultValue(false).withDocumentation("Automatically submits offset to kafka.");
        public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = ConfigProperty.key("hoodie.deltastreamer.source.kafka.enable.failOnDataLoss").defaultValue(false).withDocumentation("Fail when checkpoint goes out of bounds instead of seeking to earliest offsets.");
        public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty.key("hoodie.deltastreamer.kafka.source.maxEvents").defaultValue(5000000L).withDocumentation("Maximum number of records obtained in each batch.");
        private static final ConfigProperty<KafkaResetOffsetStrategies> KAFKA_AUTO_OFFSET_RESET = ConfigProperty.key(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).defaultValue(KafkaResetOffsetStrategies.LATEST).withDocumentation("Kafka consumer strategy for reading data.");
        public static final ConfigProperty<String> JSON_KAFKA_PROCESSOR_CLASS_OPT = ConfigProperty.key("hoodie.deltastreamer.source.json.kafka.processor.class").noDefaultValue().withDocumentation("Json kafka source post processor class name, post process data after consuming fromsource and before giving it to deltastreamer.");
        public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen$KafkaResetOffsetStrategies.class */
    public enum KafkaResetOffsetStrategies {
        LATEST,
        EARLIEST,
        GROUP
    }

    public KafkaOffsetGen(TypedProperties typedProperties) {
        this.props = typedProperties;
        this.kafkaParams = excludeHoodieConfigs(typedProperties);
        DataSourceUtils.checkRequiredProperties(typedProperties, Collections.singletonList(Config.KAFKA_TOPIC_NAME.key()));
        this.topicName = typedProperties.getString(Config.KAFKA_TOPIC_NAME.key());
        this.kafkaCheckpointType = typedProperties.getString(Config.KAFKA_CHECKPOINT_TYPE.key(), Config.KAFKA_CHECKPOINT_TYPE.defaultValue());
        String string = typedProperties.getString(Config.KAFKA_AUTO_OFFSET_RESET.key(), ((KafkaResetOffsetStrategies) Config.KAFKA_AUTO_OFFSET_RESET.defaultValue()).name().toLowerCase());
        boolean z = false;
        KafkaResetOffsetStrategies[] values = KafkaResetOffsetStrategies.values();
        int length = values.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            KafkaResetOffsetStrategies kafkaResetOffsetStrategies = values[i];
            if (kafkaResetOffsetStrategies.name().toLowerCase().equals(string)) {
                z = true;
                this.autoResetValue = kafkaResetOffsetStrategies;
                break;
            }
            i++;
        }
        if (!z) {
            throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + string);
        }
        if (this.autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
            this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET.key(), ((KafkaResetOffsetStrategies) Config.KAFKA_AUTO_OFFSET_RESET.defaultValue()).name().toLowerCase());
        }
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> option, long j, HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics) {
        Map<TopicPartition, Long> groupOffsets;
        long j2;
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.kafkaParams);
        Throwable th = null;
        try {
            if (!checkTopicExists(kafkaConsumer)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            List<PartitionInfo> fetchPartitionInfos = fetchPartitionInfos(kafkaConsumer, this.topicName);
            Set<TopicPartition> set = (Set) fetchPartitionInfos.stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toSet());
            if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(this.kafkaCheckpointType) && isValidTimestampCheckpointType(option).booleanValue()) {
                option = getOffsetsByTimestamp(kafkaConsumer, fetchPartitionInfos, set, this.topicName, Long.valueOf(Long.parseLong(option.get())));
            }
            if (option.isPresent() && !option.get().isEmpty() && checkTopicCheckpoint(option)) {
                groupOffsets = fetchValidOffsets(kafkaConsumer, option, set);
                hoodieDeltaStreamerMetrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(option, set, kafkaConsumer).longValue());
            } else {
                switch (this.autoResetValue) {
                    case EARLIEST:
                        groupOffsets = kafkaConsumer.beginningOffsets(set);
                        break;
                    case LATEST:
                        groupOffsets = kafkaConsumer.endOffsets(set);
                        break;
                    case GROUP:
                        groupOffsets = getGroupOffsets(kafkaConsumer, set);
                        break;
                    default:
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
                }
            }
            Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(set);
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            long j3 = this.props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.key(), Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue().longValue());
            if (j == Long.MAX_VALUE) {
                j2 = j3;
                LOG.info("SourceLimit not configured, set numEvents to default value : " + j3);
            } else {
                j2 = j;
            }
            if (j2 < endOffsets.size()) {
                throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
            }
            return CheckpointUtils.computeOffsetRanges(groupOffsets, endOffsets, j2);
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer kafkaConsumer, String str) {
        List<PartitionInfo> partitionsFor;
        long j = this.props.getLong(Config.KAFKA_FETCH_PARTITION_TIME_OUT.key(), Config.KAFKA_FETCH_PARTITION_TIME_OUT.defaultValue().longValue());
        long currentTimeMillis = System.currentTimeMillis();
        do {
            partitionsFor = kafkaConsumer.partitionsFor(str);
            if (partitionsFor == null) {
                try {
                    TimeUnit.SECONDS.sleep(10L);
                } catch (InterruptedException e) {
                    LOG.error("Sleep failed while fetching partitions");
                }
            }
            if (partitionsFor != null) {
                break;
            }
        } while (System.currentTimeMillis() <= currentTimeMillis + j);
        if (partitionsFor == null) {
            throw new HoodieDeltaStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", str));
        }
        return partitionsFor;
    }

    private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer kafkaConsumer, Option<String> option, Set<TopicPartition> set) {
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(set);
        Map<TopicPartition, Long> strToOffsets = CheckpointUtils.strToOffsets(option.get());
        boolean anyMatch = strToOffsets.entrySet().stream().anyMatch(entry -> {
            return ((Long) entry.getValue()).longValue() < ((Long) beginningOffsets.get(entry.getKey())).longValue();
        });
        if (anyMatch) {
            if (this.props.getBoolean(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Config.ENABLE_FAIL_ON_DATA_LOSS.defaultValue().booleanValue())) {
                throw new HoodieDeltaStreamerException("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.");
            }
            LOG.warn("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want delta streamer to fail on such cases, set \"" + Config.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
        }
        return anyMatch ? beginningOffsets : strToOffsets;
    }

    private Boolean isValidTimestampCheckpointType(Option<String> option) {
        if (option.isPresent()) {
            return Boolean.valueOf(Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?").matcher(option.get()).matches() && (option.get().length() == 13 || option.get().length() == 10));
        }
        return false;
    }

    private Long delayOffsetCalculation(Option<String> option, Set<TopicPartition> set, KafkaConsumer kafkaConsumer) {
        Long l = 0L;
        Map<TopicPartition, Long> strToOffsets = CheckpointUtils.strToOffsets(option.get());
        for (Map.Entry<TopicPartition, Long> entry : kafkaConsumer.endOffsets(set).entrySet()) {
            Long orDefault = strToOffsets.getOrDefault(entry.getKey(), 0L);
            l = Long.valueOf(l.longValue() + (entry.getValue().longValue() - orDefault.longValue() > 0 ? entry.getValue().longValue() - orDefault.longValue() : 0L));
        }
        return l;
    }

    private Option<String> getOffsetsByTimestamp(KafkaConsumer kafkaConsumer, List<PartitionInfo> list, Set<TopicPartition> set, String str, Long l) {
        Map<TopicPartition, Long> map = (Map) list.stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return l;
        }));
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(set);
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = kafkaConsumer.offsetsForTimes(map);
        StringBuilder sb = new StringBuilder();
        sb.append(str + ",");
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
            if (entry.getValue() != null) {
                sb.append(entry.getKey().partition()).append(":").append(entry.getValue().offset()).append(",");
            } else {
                sb.append(entry.getKey().partition()).append(":").append(beginningOffsets.get(entry.getKey())).append(",");
            }
        }
        return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
    }

    public boolean checkTopicExists(KafkaConsumer kafkaConsumer) {
        return kafkaConsumer.listTopics().containsKey(this.topicName);
    }

    private boolean checkTopicCheckpoint(Option<String> option) {
        return this.pattern.matcher(option.get()).matches();
    }

    public String getTopicName() {
        return this.topicName;
    }

    public Map<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }

    private Map<String, Object> excludeHoodieConfigs(TypedProperties typedProperties) {
        HashMap hashMap = new HashMap();
        typedProperties.keySet().stream().filter(obj -> {
            return !obj.toString().startsWith("hoodie.") || obj.toString().startsWith(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX);
        }).forEach(obj2 -> {
            hashMap.put(obj2.toString(), typedProperties.get(obj2.toString()));
        });
        return hashMap;
    }

    public void commitOffsetToKafka(String str) {
        DataSourceUtils.checkRequiredProperties(this.props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
        Map<TopicPartition, Long> strToOffsets = CheckpointUtils.strToOffsets(str);
        HashMap hashMap = new HashMap(strToOffsets.size());
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.kafkaParams);
            Throwable th = null;
            try {
                strToOffsets.forEach((topicPartition, l) -> {
                });
                kafkaConsumer.commitSync(hashMap);
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
            } finally {
            }
        } catch (CommitFailedException | TimeoutException e) {
            LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
        }
    }

    private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer kafkaConsumer, Set<TopicPartition> set) {
        Map<TopicPartition, Long> hashMap = new HashMap();
        Iterator<TopicPartition> it = set.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TopicPartition next = it.next();
            OffsetAndMetadata committed = kafkaConsumer.committed(next);
            if (committed == null) {
                LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset");
                hashMap = kafkaConsumer.endOffsets(set);
                break;
            }
            hashMap.put(next, Long.valueOf(committed.offset()));
        }
        return hashMap;
    }
}
