package org.apache.crunch.kafka.record;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.crunch.Pair;
import org.apache.crunch.ReadableData;
import org.apache.crunch.Source;
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/kafka/record/KafkaSource.class */
public class KafkaSource implements Source<ConsumerRecord<BytesWritable, BytesWritable>>, ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private final FormatBundle inputBundle;
    private final Properties props;
    private final Map<TopicPartition, Pair<Long, Long>> offsets;
    public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout";
    public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000;

    /* loaded from: input_file:org/apache/crunch/kafka/record/KafkaSource$BytesDeserializer.class */
    public static class BytesDeserializer implements Deserializer<BytesWritable> {
        public void configure(Map<String, ?> map, boolean z) {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public BytesWritable m15deserialize(String str, byte[] bArr) {
            return new BytesWritable(bArr);
        }

        public void close() {
        }
    }

    public KafkaSource(Properties properties, Map<TopicPartition, Pair<Long, Long>> map) {
        this.props = copyAndSetProperties(properties);
        this.inputBundle = createFormatBundle(this.props, map);
        this.offsets = Collections.unmodifiableMap(new HashMap(map));
    }

    public Source<ConsumerRecord<BytesWritable, BytesWritable>> inputConf(String str, String str2) {
        this.inputBundle.set(str, str2);
        return this;
    }

    public Source<ConsumerRecord<BytesWritable, BytesWritable>> fileSystem(FileSystem fileSystem) {
        return this;
    }

    public FileSystem getFileSystem() {
        return null;
    }

    public PType<ConsumerRecord<BytesWritable, BytesWritable>> getType() {
        return ConsumerRecordHelper.CONSUMER_RECORD_P_TYPE;
    }

    public Converter<?, ?, ?, ?> getConverter() {
        return new KafkaSourceConverter();
    }

    public long getSize(Configuration configuration) {
        return 1000000000L;
    }

    public String toString() {
        return "KafkaSource(" + this.props.getProperty("bootstrap.servers") + ")";
    }

    public long getLastModifiedAt(Configuration configuration) {
        LOG.warn("Cannot determine last modified time for source: {}", toString());
        return -1L;
    }

    private static <K, V> FormatBundle createFormatBundle(Properties properties, Map<TopicPartition, Pair<Long, Long>> map) {
        FormatBundle forInput = FormatBundle.forInput(KafkaInputFormat.class);
        KafkaInputFormat.writeOffsetsToBundle(map, forInput);
        KafkaInputFormat.writeConnectionPropertiesToBundle(properties, forInput);
        return forInput;
    }

    private static Properties copyAndSetProperties(Properties properties) {
        Properties properties2 = new Properties();
        properties2.setProperty("auto.offset.reset", "earliest");
        properties2.putAll(properties);
        properties2.setProperty("key.deserializer", BytesDeserializer.class.getName());
        properties2.setProperty("value.deserializer", BytesDeserializer.class.getName());
        properties2.setProperty("enable.auto.commit", Boolean.toString(false));
        return properties2;
    }

    public Iterable<ConsumerRecord<BytesWritable, BytesWritable>> read(Configuration configuration) throws IOException {
        return new KafkaRecordsIterable(new KafkaConsumer(this.props), this.offsets, this.props);
    }

    public void configureSource(Job job, int i) throws IOException {
        Configuration configuration = job.getConfiguration();
        if (i != -1) {
            CrunchInputs.addInputPath(job, new Path("/kafka/" + i), this.inputBundle, i);
            return;
        }
        job.setMapperClass(CrunchMapper.class);
        job.setInputFormatClass(this.inputBundle.getFormatClass());
        this.inputBundle.configure(configuration);
    }

    FormatBundle getInputBundle() {
        return this.inputBundle;
    }

    public ReadableData<ConsumerRecord<BytesWritable, BytesWritable>> asReadable() {
        return new KafkaData(this.props, this.offsets);
    }
}
