package org.apache.kafka.clients.consumer;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

/* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumer.class */
public class KafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
    private static final long serialVersionUID = 1;
    private final PulsarClient client;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final String groupId;
    private final boolean isAutoCommit;
    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer> consumers;
    private final Map<TopicPartition, Long> lastReceivedOffset;
    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset;
    private final BlockingQueue<QueueItem> receivedMessages;

    /* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumer$QueueItem.class */
    private static class QueueItem {
        final org.apache.pulsar.client.api.Consumer consumer;
        final Message message;

        QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
            this.consumer = consumer;
            this.message = message;
        }
    }

    public KafkaConsumer(Map<String, Object> map) {
        this(map, (Deserializer) null, (Deserializer) null);
    }

    public KafkaConsumer(Map<String, Object> map, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(map, (Deserializer<?>) deserializer, (Deserializer<?>) deserializer2)), deserializer, deserializer2);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (Deserializer) null, (Deserializer) null);
    }

    public KafkaConsumer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, (Deserializer<?>) deserializer, (Deserializer<?>) deserializer2)), deserializer, deserializer2);
    }

    private KafkaConsumer(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.consumers = new ConcurrentHashMap();
        this.lastReceivedOffset = new ConcurrentHashMap();
        this.lastCommittedOffset = new ConcurrentHashMap();
        this.receivedMessages = new ArrayBlockingQueue(1000);
        if (deserializer == null) {
            this.keyDeserializer = (Deserializer) consumerConfig.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.keyDeserializer.configure(consumerConfig.originals(), true);
        } else {
            this.keyDeserializer = deserializer;
            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (deserializer2 == null) {
            this.valueDeserializer = (Deserializer) consumerConfig.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            this.valueDeserializer.configure(consumerConfig.originals(), true);
        } else {
            this.valueDeserializer = deserializer2;
            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        this.groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
        this.isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).booleanValue();
        String str = consumerConfig.getList("bootstrap.servers").get(0);
        Properties properties = new Properties();
        consumerConfig.originals().forEach((str2, obj) -> {
            properties.put(str2, obj);
        });
        try {
            this.client = PulsarClient.create(str, PulsarKafkaConfig.getClientConfiguration(properties));
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void received(org.apache.pulsar.client.api.Consumer consumer, Message message) {
        try {
            this.receivedMessages.put(new QueueItem(consumer, message));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> assignment() {
        throw new UnsupportedOperationException("Cannot access the partitions assignements");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<String> subscription() {
        return (Set) this.consumers.keySet().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection) {
        ArrayList arrayList = new ArrayList();
        try {
            for (String str : collection) {
                PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata) this.client.getPartitionedTopicMetadata(str).get();
                ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
                consumerConfiguration.setSubscriptionType(SubscriptionType.Failover);
                consumerConfiguration.setMessageListener(this);
                if (partitionedTopicMetadata.partitions > 1) {
                    consumerConfiguration.setConsumerName(ConsumerName.generateRandomName());
                    for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                        CompletableFuture subscribeAsync = this.client.subscribeAsync(DestinationName.get(str).getPartition(i).toString(), this.groupId, consumerConfiguration);
                        int i2 = i;
                        subscribeAsync.thenAccept(consumer -> {
                            this.consumers.putIfAbsent(new TopicPartition(str, i2), consumer);
                        });
                        arrayList.add(subscribeAsync);
                    }
                } else {
                    CompletableFuture subscribeAsync2 = this.client.subscribeAsync(str, this.groupId, consumerConfiguration);
                    subscribeAsync2.thenAccept(consumer2 -> {
                        this.consumers.putIfAbsent(new TopicPartition(str, 0), consumer2);
                    });
                    arrayList.add(subscribeAsync2);
                }
            }
            arrayList.forEach((v0) -> {
                v0.join();
            });
        } catch (Exception e) {
            arrayList.forEach(completableFuture -> {
                try {
                    ((org.apache.pulsar.client.api.Consumer) completableFuture.get()).close();
                } catch (Exception e2) {
                }
            });
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        throw new UnsupportedOperationException("ConsumerRebalanceListener is not supported");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void assign(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException("Cannot manually assign partitions");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void unsubscribe() {
        this.consumers.values().forEach(consumer -> {
            try {
                consumer.unsubscribe();
            } catch (PulsarClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerRecords<K, V> poll(long j) {
        try {
            QueueItem poll = this.receivedMessages.poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
            }
            if (this.isAutoCommit) {
                commitAsync();
            }
            DestinationName destinationName = DestinationName.get(poll.consumer.getTopic());
            String partitionedTopicName = destinationName.getPartitionedTopicName();
            int partitionIndex = destinationName.isPartitioned() ? destinationName.getPartitionIndex() : 0;
            Message message = poll.message;
            long offset = MessageIdUtils.getOffset(message.getMessageId());
            TopicPartition topicPartition = new TopicPartition(partitionedTopicName, partitionIndex);
            K key = getKey(partitionedTopicName, message);
            V deserialize = this.valueDeserializer.deserialize(partitionedTopicName, message.getData());
            TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
            long publishTime = message.getPublishTime();
            if (message.getEventTime() > 0) {
                publishTime = message.getEventTime();
                timestampType = TimestampType.CREATE_TIME;
            }
            ConsumerRecord consumerRecord = new ConsumerRecord(partitionedTopicName, partitionIndex, offset, publishTime, timestampType, -1L, message.hasKey() ? message.getKey().length() : 0, message.getData().length, key, deserialize);
            HashMap hashMap = new HashMap();
            hashMap.put(topicPartition, Lists.newArrayList(new ConsumerRecord[]{consumerRecord}));
            this.lastReceivedOffset.put(topicPartition, Long.valueOf(offset));
            return new ConsumerRecords<>(hashMap);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private K getKey(String str, Message message) {
        if (!message.hasKey()) {
            return null;
        }
        if (this.keyDeserializer instanceof StringDeserializer) {
            return (K) message.getKey();
        }
        return this.keyDeserializer.deserialize(str, Base64.getDecoder().decode(message.getKey()));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync() {
        try {
            doCommitOffsets(getCurrentOffsetsMap()).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            doCommitOffsets(map).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync() {
        doCommitOffsets(getCurrentOffsetsMap());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        Map<TopicPartition, OffsetAndMetadata> currentOffsetsMap = getCurrentOffsetsMap();
        doCommitOffsets(currentOffsetsMap).handle((r8, th) -> {
            offsetCommitCallback.onComplete(currentOffsetsMap, th != null ? new Exception(th) : null);
            return null;
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        doCommitOffsets(map).handle((r8, th) -> {
            offsetCommitCallback.onComplete(map, th != null ? new Exception(th) : null);
            return null;
        });
    }

    private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((topicPartition, offsetAndMetadata) -> {
            org.apache.pulsar.client.api.Consumer consumer = this.consumers.get(topicPartition);
            this.lastCommittedOffset.put(topicPartition, offsetAndMetadata);
            arrayList.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
        });
        return FutureUtil.waitForAll(arrayList);
    }

    private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
        HashMap hashMap = new HashMap();
        this.lastReceivedOffset.forEach((topicPartition, l) -> {
            hashMap.put(topicPartition, new OffsetAndMetadata(l.longValue()));
        });
        return hashMap;
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToBeginning(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToEnd(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition) {
        return this.lastReceivedOffset.get(topicPartition).longValue();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return this.lastCommittedOffset.get(topicPartition);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<MetricName, ? extends Metric> metrics() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> paused() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void pause(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void resume(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void close(long j, TimeUnit timeUnit) {
        try {
            if (this.isAutoCommit) {
                commitAsync();
            }
            this.client.closeAsync().get(j, timeUnit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void wakeup() {
        throw new UnsupportedOperationException();
    }
}
