package org.apache.kafka.clients.consumer;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.tukaani.xz.common.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumer.class */
public class KafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final long serialVersionUID = 1;
    private final PulsarClient client;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final String groupId;
    private final boolean isAutoCommit;
    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers;
    private final Map<TopicPartition, Long> lastReceivedOffset;
    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset;
    private final Set<TopicPartition> unpolledPartitions;
    private final SubscriptionInitialPosition strategy;
    private volatile boolean closed;
    private final Properties properties;
    private final BlockingQueue<QueueItem> receivedMessages;
    private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;

    /* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumer$QueueItem.class */
    private static class QueueItem {
        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
        final Message<byte[]> message;

        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
            this.consumer = consumer;
            this.message = message;
        }
    }

    public KafkaConsumer(Map<String, Object> map) {
        this(map, (Deserializer) null, (Deserializer) null);
    }

    public KafkaConsumer(Map<String, Object> map, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(map, (Deserializer<?>) deserializer, (Deserializer<?>) deserializer2)), deserializer, deserializer2);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (Deserializer) null, (Deserializer) null);
    }

    public KafkaConsumer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, (Deserializer<?>) deserializer, (Deserializer<?>) deserializer2)), deserializer, deserializer2);
    }

    private KafkaConsumer(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.consumers = new ConcurrentHashMap();
        this.lastReceivedOffset = new ConcurrentHashMap();
        this.lastCommittedOffset = new ConcurrentHashMap();
        this.unpolledPartitions = new HashSet();
        this.closed = false;
        this.receivedMessages = new ArrayBlockingQueue(1000);
        if (deserializer == null) {
            this.keyDeserializer = (Deserializer) consumerConfig.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.keyDeserializer.configure(consumerConfig.originals(), true);
        } else {
            this.keyDeserializer = deserializer;
            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (deserializer2 == null) {
            this.valueDeserializer = (Deserializer) consumerConfig.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.valueDeserializer.configure(consumerConfig.originals(), true);
        } else {
            this.valueDeserializer = deserializer2;
            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        this.groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
        this.isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).booleanValue();
        this.strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
        log.info("Offset reset strategy has been assigned value {}", this.strategy);
        String str = consumerConfig.getList("bootstrap.servers").get(0);
        this.properties = new Properties();
        consumerConfig.originals().forEach((str2, obj) -> {
            this.properties.put(str2, obj);
        });
        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(this.properties);
        clientBuilder.enableTcpNoDelay(false);
        try {
            this.client = clientBuilder.serviceUrl(str).build();
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private SubscriptionInitialPosition getStrategy(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -809579181:
                if (str.equals("earliest")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return SubscriptionInitialPosition.Earliest;
            default:
                return SubscriptionInitialPosition.Latest;
        }
    }

    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
        try {
            this.receivedMessages.put(new QueueItem(consumer, message));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (!this.closed) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> assignment() {
        throw new UnsupportedOperationException("Cannot access the partitions assignements");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<String> subscription() {
        return (Set) this.consumers.keySet().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection) {
        subscribe(collection, (ConsumerRebalanceListener) null);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            for (String str : collection) {
                int intValue = ((PulsarClientImpl) this.client).getNumberOfPartitions(str).get().intValue();
                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(this.client, this.properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener(this);
                consumerBuilder.subscriptionName(this.groupId);
                if (intValue > 1) {
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    for (int i = 0; i < intValue; i++) {
                        CompletableFuture subscribeAsync = consumerBuilder.clone().topic(new String[]{TopicName.get(str).getPartition(i).toString()}).subscribeAsync();
                        TopicPartition topicPartition = new TopicPartition(TopicName.get(str).getPartitionedTopicName(), i);
                        arrayList.add(subscribeAsync.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, topicPartition);
                            this.consumers.putIfAbsent(topicPartition, consumer);
                            return consumer;
                        }));
                        arrayList2.add(topicPartition);
                    }
                } else {
                    CompletableFuture subscribeAsync2 = consumerBuilder.topic(new String[]{str}).subscribeAsync();
                    TopicPartition topicPartition2 = new TopicPartition(TopicName.get(str).getPartitionedTopicName(), 0);
                    arrayList.add(subscribeAsync2.thenApply(consumer2 -> {
                        log.info("Add consumer {} for partition {}", consumer2, topicPartition2);
                        this.consumers.putIfAbsent(topicPartition2, consumer2);
                        return consumer2;
                    }));
                    arrayList2.add(topicPartition2);
                }
            }
            this.unpolledPartitions.addAll(arrayList2);
            arrayList.forEach((v0) -> {
                v0.join();
            });
            if (consumerRebalanceListener != null) {
                consumerRebalanceListener.onPartitionsAssigned(arrayList2);
            }
        } catch (Exception e) {
            arrayList.forEach(completableFuture -> {
                try {
                    ((org.apache.pulsar.client.api.Consumer) completableFuture.get()).close();
                } catch (Exception e2) {
                }
            });
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void assign(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException("Cannot manually assign partitions");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void unsubscribe() {
        this.consumers.values().forEach(consumer -> {
            try {
                consumer.unsubscribe();
            } catch (PulsarClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerRecords<K, V> poll(long j) {
        try {
            QueueItem poll = this.receivedMessages.poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
            }
            HashMap hashMap = new HashMap();
            int i = 0;
            while (poll != null) {
                TopicName topicName = TopicName.get(poll.consumer.getTopic());
                String partitionedTopicName = topicName.getPartitionedTopicName();
                int partitionIndex = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> message = poll.message;
                long offset = MessageIdUtils.getOffset((MessageIdImpl) message.getMessageId());
                TopicPartition topicPartition = new TopicPartition(partitionedTopicName, partitionIndex);
                if (this.lastReceivedOffset.get(topicPartition) == null && !this.unpolledPartitions.contains(topicPartition)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", topicPartition);
                    resetOffsets(topicPartition);
                }
                K key = getKey(partitionedTopicName, message);
                V deserialize = this.valueDeserializer.deserialize(partitionedTopicName, message.getData());
                TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
                long publishTime = message.getPublishTime();
                if (message.getEventTime() > 0) {
                    publishTime = message.getEventTime();
                    timestampType = TimestampType.CREATE_TIME;
                }
                ((List) hashMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                    return new ArrayList();
                })).add(new ConsumerRecord(partitionedTopicName, partitionIndex, offset, publishTime, timestampType, -1L, message.hasKey() ? message.getKey().length() : 0, message.getData().length, key, deserialize));
                this.lastReceivedOffset.put(topicPartition, Long.valueOf(offset));
                this.unpolledPartitions.remove(topicPartition);
                i++;
                if (i >= 1000) {
                    break;
                }
                poll = this.receivedMessages.poll(0L, TimeUnit.MILLISECONDS);
            }
            if (this.isAutoCommit && !hashMap.isEmpty()) {
                commitAsync();
            }
            return new ConsumerRecords<>(hashMap);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private K getKey(String str, Message<byte[]> message) {
        if (!message.hasKey()) {
            return null;
        }
        if (this.keyDeserializer instanceof StringDeserializer) {
            return (K) message.getKey();
        }
        return this.keyDeserializer.deserialize(str, Base64.getDecoder().decode(message.getKey()));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync() {
        try {
            doCommitOffsets(getCurrentOffsetsMap()).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            doCommitOffsets(map).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync() {
        doCommitOffsets(getCurrentOffsetsMap());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        Map<TopicPartition, OffsetAndMetadata> currentOffsetsMap = getCurrentOffsetsMap();
        doCommitOffsets(currentOffsetsMap).handle((r8, th) -> {
            offsetCommitCallback.onComplete(currentOffsetsMap, th != null ? new Exception(th) : null);
            return null;
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        doCommitOffsets(map).handle((r8, th) -> {
            offsetCommitCallback.onComplete(map, th != null ? new Exception(th) : null);
            return null;
        });
    }

    private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((topicPartition, offsetAndMetadata) -> {
            org.apache.pulsar.client.api.Consumer<byte[]> consumer = this.consumers.get(topicPartition);
            this.lastCommittedOffset.put(topicPartition, offsetAndMetadata);
            arrayList.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
        });
        return FutureUtil.waitForAll(arrayList);
    }

    private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
        HashMap hashMap = new HashMap();
        this.lastReceivedOffset.forEach((topicPartition, l) -> {
            hashMap.put(topicPartition, new OffsetAndMetadata(l.longValue()));
        });
        return hashMap;
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, long j) {
        MessageId messageId = MessageIdUtils.getMessageId(j);
        org.apache.pulsar.client.api.Consumer<byte[]> consumer = this.consumers.get(topicPartition);
        if (consumer == null) {
            throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
        }
        try {
            consumer.seek(messageId);
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToBeginning(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList();
        if (collection.isEmpty()) {
            collection = this.consumers.keySet();
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            org.apache.pulsar.client.api.Consumer<byte[]> consumer = this.consumers.get(it.next());
            if (consumer == null) {
                arrayList.add(FutureUtil.failedFuture(new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
            } else {
                arrayList.add(consumer.seekAsync(MessageId.earliest));
            }
        }
        FutureUtil.waitForAll(arrayList).join();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToEnd(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList();
        if (collection.isEmpty()) {
            collection = this.consumers.keySet();
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            org.apache.pulsar.client.api.Consumer<byte[]> consumer = this.consumers.get(it.next());
            if (consumer == null) {
                arrayList.add(FutureUtil.failedFuture(new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
            } else {
                arrayList.add(consumer.seekAsync(MessageId.latest));
            }
        }
        FutureUtil.waitForAll(arrayList).join();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition) {
        Long l = this.lastReceivedOffset.get(topicPartition);
        if (l == null && !this.unpolledPartitions.contains(topicPartition)) {
            return resetOffsets(topicPartition).getValue();
        }
        if (this.unpolledPartitions.contains(topicPartition)) {
            return 0L;
        }
        return l.longValue();
    }

    private SubscriptionInitialPosition resetOffsets(TopicPartition topicPartition) {
        log.info("Resetting partition {} and seeking to {} position", topicPartition, this.strategy);
        if (this.strategy == SubscriptionInitialPosition.Earliest) {
            seekToBeginning(Collections.singleton(topicPartition));
        } else {
            seekToEnd(Collections.singleton(topicPartition));
        }
        return this.strategy;
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return this.lastCommittedOffset.get(topicPartition);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<MetricName, ? extends Metric> metrics() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> paused() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void pause(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void resume(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Util.VLI_MAX, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void close(long j, TimeUnit timeUnit) {
        try {
            this.closed = true;
            if (this.isAutoCommit) {
                commitAsync();
            }
            this.client.closeAsync().get(j, timeUnit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void wakeup() {
        throw new UnsupportedOperationException();
    }
}
