package org.apache.gobblin.kafka.client;

import com.codahale.metrics.Metric;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/kafka/client/Kafka09ConsumerClient.class */
public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient {
    private static final String KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
    private static final String KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms";
    private static final String KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer";
    public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = "source.kafka.key.deserializer";
    public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = "source.kafka.value.deserializer";
    private final Consumer<K, V> consumer;
    private static final Logger log = LoggerFactory.getLogger(Kafka09ConsumerClient.class);
    private static final String KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
    private static final String KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
    private static final String KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer";
    public static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    private static final String KAFKA_09_CLIENT_GROUP_ID = "group.id";
    private static final String KAFKA_09_DEFAULT_GROUP_ID = "kafka09";
    private static final Config FALLBACK = ConfigFactory.parseMap(ImmutableMap.builder().put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT).put(KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER).put(KAFKA_09_CLIENT_GROUP_ID, KAFKA_09_DEFAULT_GROUP_ID).build());
    private static final Function<PartitionInfo, KafkaPartition> PARTITION_INFO_TO_KAFKA_PARTITION = new Function<PartitionInfo, KafkaPartition>() { // from class: org.apache.gobblin.kafka.client.Kafka09ConsumerClient.4
        public KafkaPartition apply(@Nonnull PartitionInfo partitionInfo) {
            return new KafkaPartition.Builder().withId(partitionInfo.partition()).withTopicName(partitionInfo.topic()).withLeaderId(partitionInfo.leader().id()).withLeaderHostAndPort(partitionInfo.leader().host(), partitionInfo.leader().port()).build();
        }
    };

    /* loaded from: input_file:org/apache/gobblin/kafka/client/Kafka09ConsumerClient$Factory.class */
    public static class Factory implements GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory {
        public GobblinKafkaConsumerClient create(Config config) {
            return new Kafka09ConsumerClient(config);
        }
    }

    /* loaded from: input_file:org/apache/gobblin/kafka/client/Kafka09ConsumerClient$Kafka09ConsumerRecord.class */
    public static class Kafka09ConsumerRecord<K, V> extends BaseKafkaConsumerRecord implements DecodeableKafkaRecord<K, V> {
        private final ConsumerRecord<K, V> consumerRecord;

        public Kafka09ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
            super(consumerRecord.offset(), -1L, consumerRecord.topic(), consumerRecord.partition());
            this.consumerRecord = consumerRecord;
        }

        public K getKey() {
            return (K) this.consumerRecord.key();
        }

        public V getValue() {
            return (V) this.consumerRecord.value();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Kafka09ConsumerRecord)) {
                return false;
            }
            Kafka09ConsumerRecord kafka09ConsumerRecord = (Kafka09ConsumerRecord) obj;
            if (!kafka09ConsumerRecord.canEqual(this) || !super.equals(obj)) {
                return false;
            }
            ConsumerRecord<K, V> consumerRecord = this.consumerRecord;
            ConsumerRecord<K, V> consumerRecord2 = kafka09ConsumerRecord.consumerRecord;
            return consumerRecord == null ? consumerRecord2 == null : consumerRecord.equals(consumerRecord2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Kafka09ConsumerRecord;
        }

        public int hashCode() {
            int hashCode = super.hashCode();
            ConsumerRecord<K, V> consumerRecord = this.consumerRecord;
            return (hashCode * 59) + (consumerRecord == null ? 43 : consumerRecord.hashCode());
        }

        public String toString() {
            return "Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord=" + this.consumerRecord + ")";
        }
    }

    private Kafka09ConsumerClient(Config config) {
        super(config);
        Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), "Missing required property source.kafka.value.deserializer");
        Properties properties = new Properties();
        properties.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(((AbstractBaseKafkaConsumerClient) this).brokers));
        properties.put(KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY, Integer.valueOf(((AbstractBaseKafkaConsumerClient) this).socketTimeoutMillis));
        Config withFallback = ConfigUtils.getConfigOrEmpty(config, "source.kafka").withFallback(FALLBACK);
        properties.putAll(ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(withFallback, "consumerConfig").withFallback(ConfigUtils.getConfigOrEmpty(config, "gobblin.kafka.sharedConfig")).withFallback(withFallback.withoutPath("consumerConfig"))));
        this.consumer = new KafkaConsumer(properties);
    }

    public Kafka09ConsumerClient(Config config, Consumer<K, V> consumer) {
        super(config);
        this.consumer = consumer;
    }

    public List<KafkaTopic> getTopics() {
        return FluentIterable.from(this.consumer.listTopics().entrySet()).transform(new Function<Map.Entry<String, List<PartitionInfo>>, KafkaTopic>() { // from class: org.apache.gobblin.kafka.client.Kafka09ConsumerClient.1
            public KafkaTopic apply(Map.Entry<String, List<PartitionInfo>> entry) {
                return new KafkaTopic(entry.getKey(), Lists.transform(entry.getValue(), Kafka09ConsumerClient.PARTITION_INFO_TO_KAFKA_PARTITION));
            }
        }).toList();
    }

    public long getEarliestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
        TopicPartition topicPartition = new TopicPartition(kafkaPartition.getTopicName(), kafkaPartition.getId());
        this.consumer.assign(Collections.singletonList(topicPartition));
        long position = this.consumer.position(topicPartition);
        this.consumer.seekToBeginning(new TopicPartition[]{topicPartition});
        long position2 = this.consumer.position(topicPartition);
        this.consumer.seek(topicPartition, position);
        return position2;
    }

    public long getLatestOffset(KafkaPartition kafkaPartition) throws KafkaOffsetRetrievalFailureException {
        TopicPartition topicPartition = new TopicPartition(kafkaPartition.getTopicName(), kafkaPartition.getId());
        this.consumer.assign(Collections.singletonList(topicPartition));
        long position = this.consumer.position(topicPartition);
        this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
        long position2 = this.consumer.position(topicPartition);
        this.consumer.seek(topicPartition, position);
        return position2;
    }

    public void assignAndSeek(List<KafkaPartition> list, Map<KafkaPartition, LongWatermark> map) {
        HashSet hashSet = new HashSet(list);
        map.entrySet().stream().filter(entry -> {
            return hashSet.contains(entry.getKey());
        }).forEach(entry2 -> {
            TopicPartition topicPartition = new TopicPartition(((KafkaPartition) entry2.getKey()).getTopicName(), ((KafkaPartition) entry2.getKey()).getId());
            this.consumer.assign(Collections.singletonList(topicPartition));
            this.consumer.seek(topicPartition, ((LongWatermark) entry2.getValue()).getValue());
        });
    }

    public Iterator<KafkaConsumerRecord> consume(KafkaPartition kafkaPartition, long j, long j2) {
        if (j > j2) {
            return null;
        }
        this.consumer.assign(Lists.newArrayList(new TopicPartition[]{new TopicPartition(kafkaPartition.getTopicName(), kafkaPartition.getId())}));
        this.consumer.seek(new TopicPartition(kafkaPartition.getTopicName(), kafkaPartition.getId()), j);
        return consume();
    }

    public Iterator<KafkaConsumerRecord> consume() {
        try {
            return Iterators.transform(this.consumer.poll(((AbstractBaseKafkaConsumerClient) this).fetchTimeoutMillis).iterator(), consumerRecord -> {
                try {
                    return new Kafka09ConsumerRecord(consumerRecord);
                } catch (Throwable th) {
                    throw Throwables.propagate(th);
                }
            });
        } catch (Exception e) {
            log.error("Exception on polling records", e);
            throw new RuntimeException(e);
        }
    }

    public void subscribe(String str) {
        this.consumer.subscribe(Lists.newArrayList(new String[]{str}), new NoOpConsumerRebalanceListener());
    }

    public void subscribe(String str, final GobblinConsumerRebalanceListener gobblinConsumerRebalanceListener) {
        this.consumer.subscribe(Lists.newArrayList(new String[]{str}), new ConsumerRebalanceListener() { // from class: org.apache.gobblin.kafka.client.Kafka09ConsumerClient.2
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                gobblinConsumerRebalanceListener.onPartitionsRevoked((Collection) collection.stream().map(topicPartition -> {
                    return new KafkaPartition.Builder().withTopicName(topicPartition.topic()).withId(topicPartition.partition()).build();
                }).collect(Collectors.toList()));
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                gobblinConsumerRebalanceListener.onPartitionsAssigned((Collection) collection.stream().map(topicPartition -> {
                    return new KafkaPartition.Builder().withTopicName(topicPartition.topic()).withId(topicPartition.partition()).build();
                }).collect(Collectors.toList()));
            }
        });
    }

    public Map<String, Metric> getMetrics() {
        Map metrics = this.consumer.metrics();
        HashMap hashMap = new HashMap();
        metrics.forEach((metricName, kafkaMetric) -> {
        });
        return hashMap;
    }

    public void commitOffsetsAsync(Map<KafkaPartition, Long> map) {
        this.consumer.commitAsync((Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return new TopicPartition(((KafkaPartition) entry.getKey()).getTopicName(), ((KafkaPartition) entry.getKey()).getId());
        }, entry2 -> {
            return new OffsetAndMetadata(((Long) entry2.getValue()).longValue());
        })), new OffsetCommitCallback() { // from class: org.apache.gobblin.kafka.client.Kafka09ConsumerClient.3
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map2, Exception exc) {
                if (exc != null) {
                    Kafka09ConsumerClient.log.error("Exception while committing offsets " + map2, exc);
                }
            }
        });
    }

    public void commitOffsetsSync(Map<KafkaPartition, Long> map) {
        this.consumer.commitSync((Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return new TopicPartition(((KafkaPartition) entry.getKey()).getTopicName(), ((KafkaPartition) entry.getKey()).getId());
        }, entry2 -> {
            return new OffsetAndMetadata(((Long) entry2.getValue()).longValue());
        })));
    }

    public long committed(KafkaPartition kafkaPartition) {
        OffsetAndMetadata committed = this.consumer.committed(new TopicPartition(kafkaPartition.getTopicName(), kafkaPartition.getId()));
        if (committed != null) {
            return committed.offset();
        }
        return -1L;
    }

    private Metric kafkaToCodaHaleMetric(KafkaMetric kafkaMetric) {
        if (log.isDebugEnabled()) {
            log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
        }
        return () -> {
            return Double.valueOf(kafkaMetric.value());
        };
    }

    private String canonicalMetricName(KafkaMetric kafkaMetric) {
        MetricName metricName = kafkaMetric.metricName();
        return canonicalMetricName(metricName.group(), metricName.tags().values(), metricName.name());
    }

    public void close() throws IOException {
        this.consumer.close();
    }
}
