package org.apache.crunch.kafka.inputformat;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.crunch.Pair;
import org.apache.crunch.io.FormatBundle;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:org/apache/crunch/kafka/inputformat/KafkaInputFormat.class */
public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
    private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic";
    private static final String PARTITIONS = "partitions";
    private static final String START = "start";
    private static final String END = "end";
    private static final String TOPIC_KEY_REGEX = "org.apache.crunch.kafka.offsets.topic\\..*\\.partitions$";
    private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties";
    private static final Pattern CONNECTION_PROPERTY_REGEX = Pattern.compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$");
    private Configuration configuration;

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
        LinkedList linkedList = new LinkedList();
        for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
            TopicPartition key = entry.getKey();
            if (((Long) entry.getValue().first()).longValue() != ((Long) entry.getValue().second()).longValue()) {
                linkedList.add(new KafkaInputSplit(key.topic(), key.partition(), ((Long) entry.getValue().first()).longValue(), ((Long) entry.getValue().second()).longValue()));
            }
        }
        return linkedList;
    }

    public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new KafkaRecordReader();
    }

    public void setConf(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConf() {
        return this.configuration;
    }

    public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> map, FormatBundle formatBundle) {
        for (Map.Entry<String, String> entry : generateValues(map).entrySet()) {
            formatBundle.set(entry.getKey(), entry.getValue());
        }
    }

    public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> map, Configuration configuration) {
        for (Map.Entry<String, String> entry : generateValues(map).entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
    }

    public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) {
        HashMap hashMap = new HashMap();
        for (String str : configuration.getValByRegex(TOPIC_KEY_REGEX).keySet()) {
            String topicFromKey = getTopicFromKey(str);
            for (int i : configuration.getInts(str)) {
                TopicPartition topicPartition = new TopicPartition(topicFromKey, i);
                long j = configuration.getLong(generatePartitionStartKey(topicFromKey, i), Long.MIN_VALUE);
                long j2 = configuration.getLong(generatePartitionEndKey(topicFromKey, i), Long.MIN_VALUE);
                if (j == Long.MIN_VALUE || j2 == Long.MIN_VALUE) {
                    throw new IllegalStateException("The " + topicPartition + "has an invalid start:" + j + " or end:" + j2 + " offset configured.");
                }
                hashMap.put(topicPartition, Pair.of(Long.valueOf(j), Long.valueOf(j2)));
            }
        }
        return hashMap;
    }

    private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> map) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            String str = key.topic();
            int partition = key.partition();
            String generatePartitionStartKey = generatePartitionStartKey(str, partition);
            String generatePartitionEndKey = generatePartitionEndKey(str, partition);
            hashMap.put(generatePartitionStartKey, Long.toString(((Long) entry.getValue().first()).longValue()));
            hashMap.put(generatePartitionEndKey, Long.toString(((Long) entry.getValue().second()).longValue()));
            Set set = (Set) hashMap2.get(str);
            if (set == null) {
                set = new HashSet();
                hashMap2.put(str, set);
            }
            set.add(Integer.valueOf(partition));
        }
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            hashMap.put("org.apache.crunch.kafka.offsets.topic." + ((String) entry2.getKey()) + "." + PARTITIONS, StringUtils.join((Set) entry2.getValue(), ","));
        }
        return hashMap;
    }

    static String generatePartitionStartKey(String str, int i) {
        return "org.apache.crunch.kafka.offsets.topic." + str + "." + PARTITIONS + "." + i + "." + START;
    }

    static String generatePartitionEndKey(String str, int i) {
        return "org.apache.crunch.kafka.offsets.topic." + str + "." + PARTITIONS + "." + i + "." + END;
    }

    static String generateTopicPartitionsKey(String str) {
        return "org.apache.crunch.kafka.offsets.topic." + str + "." + PARTITIONS;
    }

    static String getTopicFromKey(String str) {
        String substring = str.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1);
        return substring.substring(0, substring.length() - (PARTITIONS.length() + 1));
    }

    public static void writeConnectionPropertiesToBundle(Properties properties, FormatBundle formatBundle) {
        for (String str : properties.stringPropertyNames()) {
            formatBundle.set(str, properties.getProperty(str));
        }
    }

    static String generateConnectionPropertyKey(String str) {
        return "org.apache.crunch.kafka.connection.properties." + str;
    }

    static String getConnectionPropertyFromKey(String str) {
        return str.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1);
    }

    public static Properties tagExistingKafkaConnectionProperties(Properties properties) {
        Properties properties2 = new Properties();
        for (String str : properties.stringPropertyNames()) {
            properties2.put(generateConnectionPropertyKey(str), properties.getProperty(str));
        }
        return properties2;
    }

    public static Properties filterConnectionProperties(Properties properties) {
        Properties properties2 = new Properties();
        for (String str : properties.stringPropertyNames()) {
            if (CONNECTION_PROPERTY_REGEX.matcher(str).matches()) {
                properties2.put(getConnectionPropertyFromKey(str), properties.getProperty(str));
            }
        }
        return properties2;
    }
}
