package kafka.javaapi.consumer;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import kafka.api.FetchRequest;
import kafka.api.OffsetRequest;
import kafka.api.PartitionFetchInfo;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.OriginalOffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.OriginalOffsetCommitResponse;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:kafka/javaapi/consumer/SimpleConsumer.class */
public class SimpleConsumer extends OriginalSimpleConsumer {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
    private final String host;
    private final int port;
    private final String clientId;
    private final PulsarClient client;
    private final PulsarAdmin admin;
    private final Map<TopicGroup, Consumer<byte[]>> topicConsumerMap;
    private final SubscriptionType subscriptionType;
    public static final String SUBSCRIPTION_TYPE = "pulsar.subscription.type";
    public static final String HTTP_SERVICE_URL = "pulsar.http.service.url";

    /* loaded from: input_file:kafka/javaapi/consumer/SimpleConsumer$TopicGroup.class */
    public static class TopicGroup {
        protected String topic;
        protected String grouoId;

        public TopicGroup(String str, String str2) {
            this.topic = str;
            this.grouoId = str2;
        }

        public int hashCode() {
            return Objects.hash(this.topic, this.grouoId);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof TopicGroup)) {
                return false;
            }
            TopicGroup topicGroup = (TopicGroup) obj;
            return Objects.equals(this.topic, topicGroup.topic) && Objects.equals(this.grouoId, topicGroup.grouoId);
        }
    }

    public SimpleConsumer(String str, int i, int i2, int i3, String str2) {
        this(str, i, i2, i3, str2, new Properties());
    }

    public SimpleConsumer(String str, int i, int i2, int i3, String str2, Properties properties) {
        super(str, i, i2, i3, str2);
        this.host = str;
        this.port = i;
        this.clientId = str2;
        try {
            this.client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(str).build();
            try {
                this.admin = PulsarClientKafkaConfig.getAdminBuilder(properties.getProperty(HTTP_SERVICE_URL, str), properties).build();
                this.topicConsumerMap = new ConcurrentHashMap(8, 0.75f, 1);
                this.subscriptionType = getSubscriptionType(properties);
            } catch (PulsarClientException e) {
                log.warn("Failed to create pulsar admin for {} and properties {}", str, properties);
                throw new RuntimeException("Failed to create pulsar admin " + str, e);
            }
        } catch (PulsarClientException e2) {
            log.warn("Failed to create pulsar client for {} and properties {}", str, properties);
            throw new RuntimeException("Failed to create pulsar client " + str, e2);
        }
    }

    @Override // kafka.javaapi.consumer.OriginalSimpleConsumer
    public FetchResponse fetch(FetchRequest fetchRequest) {
        try {
            return new FetchResponse(createTopicReaders(fetchRequest), false);
        } catch (Exception e) {
            log.warn("Failed to process fetch request{}, {}", fetchRequest, e.getMessage());
            return new FetchResponse(null, true);
        }
    }

    private Map<String, Reader<byte[]>> createTopicReaders(FetchRequest fetchRequest) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry entry : ((Map) JavaConverters.mapAsJavaMapConverter(fetchRequest.requestInfoGroupedByTopic()).asJava()).entrySet()) {
            String str = (String) entry.getKey();
            Map map = (Map) JavaConverters.mapAsJavaMapConverter((scala.collection.Map) entry.getValue()).asJava();
            if (map != null && !map.isEmpty()) {
                Map.Entry entry2 = (Map.Entry) map.entrySet().iterator().next();
                long offset = ((PartitionFetchInfo) entry2.getValue()).offset();
                String topicName = getTopicName((TopicAndPartition) entry2.getKey());
                MessageId messageId = getMessageId(offset);
                try {
                    Reader create = this.client.newReader().readerName(this.clientId).topic(topicName).startMessageId(messageId).create();
                    log.info("Successfully created reader for {} at msg-id {}", topicName, messageId);
                    newHashMap.put(str, create);
                } catch (PulsarClientException e) {
                    log.warn("Failed to create reader for topic {}", topicName, e);
                    throw new RuntimeException("Failed to create reader for " + topicName, e);
                }
            }
        }
        return newHashMap;
    }

    private MessageId getMessageId(long j) {
        return OffsetRequest.EarliestTime() == j ? MessageId.earliest : OffsetRequest.LatestTime() == j ? MessageId.latest : MessageIdUtils.getMessageId(j);
    }

    @Override // kafka.javaapi.consumer.OriginalSimpleConsumer
    public TopicMetadataResponse send(TopicMetadataRequest topicMetadataRequest) {
        return new TopicMetadataResponse(this.admin, this.host, this.port, topicMetadataRequest.topics());
    }

    public OffsetResponse getOffsetsBefore(kafka.javaapi.OffsetRequest offsetRequest) {
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = offsetRequest.getRequestInfo();
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : requestInfo.entrySet()) {
            TopicAndPartition key = entry.getKey();
            long time = entry.getValue().time();
            if (time != OffsetRequest.EarliestTime() && time != OffsetRequest.LatestTime()) {
                throw new IllegalArgumentException("Time has to be from EarliestTime or LatestTime");
            }
            newHashMap.put(key, Long.valueOf(time));
        }
        return new OffsetResponse(newHashMap);
    }

    public OriginalOffsetCommitResponse commitOffsets(OffsetCommitRequest offsetCommitRequest) {
        OffsetCommitResponse offsetCommitResponse = new OffsetCommitResponse(null);
        for (Map.Entry<String, MessageId> entry : offsetCommitRequest.getTopicOffsetMap().entrySet()) {
            String key = entry.getKey();
            try {
                getConsumer(key, offsetCommitRequest.getGroupId()).acknowledgeCumulative(entry.getValue());
            } catch (Exception e) {
                log.warn("Failed to ack message for topic {}-{}", new Object[]{key, entry.getValue(), e});
                offsetCommitResponse.hasError = true;
                offsetCommitResponse.errors.computeIfAbsent(new TopicAndPartition(key, 0), topicAndPartition -> {
                    return Short.valueOf(ErrorMapping.UnknownCode());
                });
            }
        }
        return offsetCommitResponse;
    }

    public OffsetFetchResponse fetchOffsets(OffsetFetchRequest offsetFetchRequest) {
        String str = offsetFetchRequest.groupId;
        HashMap newHashMap = Maps.newHashMap();
        OffsetFetchResponse offsetFetchResponse = new OffsetFetchResponse(newHashMap);
        for (TopicAndPartition topicAndPartition : offsetFetchRequest.requestInfo) {
            String topicName = getTopicName(topicAndPartition);
            try {
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(topicName);
                PersistentTopicInternalStats.CursorStats cursorStats = internalStats.cursors != null ? internalStats.cursors.get(str) : null;
                if (cursorStats != null) {
                    String str2 = cursorStats.readPosition;
                    MessageIdImpl messageIdImpl = null;
                    if (str2 != null && str2.contains(":")) {
                        try {
                            String[] split = str2.split(":");
                            messageIdImpl = new MessageIdImpl(Long.parseLong(split[0]), Long.parseLong(split[1]), -1);
                        } catch (Exception e) {
                            log.warn("Invalid read-position {} for {}-{}", new Object[]{str2, topicName, str});
                        }
                    }
                    newHashMap.put(topicAndPartition, new OriginalOffsetMetadataAndError(MessageIdUtils.getOffset(messageIdImpl == null ? MessageId.earliest : messageIdImpl), null, ErrorMapping.NoError()));
                }
            } catch (Exception e2) {
                newHashMap.put(topicAndPartition, new OriginalOffsetMetadataAndError(0L, null, ErrorMapping.UnknownCode()));
            }
        }
        return offsetFetchResponse;
    }

    public static String getTopicName(TopicAndPartition topicAndPartition) {
        return topicAndPartition.partition() > -1 ? TopicName.get(topicAndPartition.topic()).getPartition(topicAndPartition.partition()).toString() : topicAndPartition.topic();
    }

    @Override // kafka.javaapi.consumer.OriginalSimpleConsumer
    public void close() {
        if (this.topicConsumerMap != null) {
            this.topicConsumerMap.forEach((topicGroup, consumer) -> {
                try {
                    consumer.close();
                } catch (PulsarClientException e) {
                    log.warn("Failed to close consumer for topic {}", topicGroup, e);
                }
            });
            this.topicConsumerMap.clear();
        }
        if (this.client != null) {
            try {
                this.client.close();
            } catch (PulsarClientException e) {
                log.warn("Failed to close pulsar-client ", e);
            }
        }
        if (this.admin != null) {
            try {
                this.admin.close();
            } catch (Exception e2) {
                log.warn("Failed to close pulsar-admin ", e2);
            }
        }
    }

    private Consumer<byte[]> getConsumer(String str, String str2) {
        return this.topicConsumerMap.computeIfAbsent(new TopicGroup(str, str2), topicGroup -> {
            try {
                return this.client.newConsumer().topic(new String[]{str}).subscriptionName(str2).subscriptionType(this.subscriptionType).subscribe();
            } catch (PulsarClientException e) {
                log.error("Failed to create consumer for topic {}", str, e);
                throw new RuntimeException("Failed to create consumer for topic " + str, e);
            }
        });
    }

    private SubscriptionType getSubscriptionType(Properties properties) {
        try {
            return SubscriptionType.valueOf((properties == null || !properties.contains(SUBSCRIPTION_TYPE)) ? SubscriptionType.Failover.toString() : properties.getProperty(SUBSCRIPTION_TYPE));
        } catch (IllegalArgumentException e) {
            return SubscriptionType.Failover;
        }
    }
}
