package org.apache.inlong.audit.service.consume;

import com.google.common.base.Preconditions;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.service.InsertData;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/PulsarConsume.class */
public class PulsarConsume extends BaseConsume {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsume.class);
    private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap;

    public PulsarConsume(List<InsertData> list, StoreConfig storeConfig, MessageQueueConfig messageQueueConfig) {
        super(list, storeConfig, messageQueueConfig);
        this.topicConsumerMap = new ConcurrentHashMap<>();
    }

    @Override // org.apache.inlong.audit.service.consume.BaseConsume
    public void start() {
        String pulsarServerUrl = this.mqConfig.getPulsarServerUrl();
        Preconditions.checkArgument(StringUtils.isNotEmpty(pulsarServerUrl), "no pulsar server url specified");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getPulsarTopic()), "no pulsar topic specified");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getPulsarConsumerSubName()), "no pulsar consumeSubName specified");
        updateConcurrentConsumer(getOrCreatePulsarClient(pulsarServerUrl));
    }

    private PulsarClient getOrCreatePulsarClient(String str) {
        LOG.info("start consumer pulsarServerUrl = {}", str);
        PulsarClient pulsarClient = null;
        ClientBuilder builder = PulsarClient.builder();
        try {
            if (this.mqConfig.isPulsarEnableAuth() && StringUtils.isNotEmpty(this.mqConfig.getPulsarToken())) {
                builder.authentication(AuthenticationFactory.token(this.mqConfig.getPulsarToken()));
            }
            pulsarClient = builder.serviceUrl(str).connectionTimeout(this.mqConfig.getClientOperationTimeoutSecond(), TimeUnit.SECONDS).build();
        } catch (PulsarClientException e) {
            LOG.error("getOrCreatePulsarClient has pulsar {} err {}", str, e);
        }
        return pulsarClient;
    }

    protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
        List<Consumer<byte[]>> computeIfAbsent = this.topicConsumerMap.computeIfAbsent(this.mqConfig.getPulsarTopic(), str -> {
            return new ArrayList();
        });
        int size = computeIfAbsent.size();
        int concurrentConsumerNum = this.mqConfig.getConcurrentConsumerNum() - size;
        if (concurrentConsumerNum > 0) {
            for (int i = 0; i < this.mqConfig.getConcurrentConsumerNum(); i++) {
                Consumer<byte[]> createConsumer = createConsumer(pulsarClient, this.mqConfig.getPulsarTopic());
                if (createConsumer != null) {
                    computeIfAbsent.add(createConsumer);
                }
            }
            return;
        }
        if (concurrentConsumerNum < 0) {
            int i2 = size - 1;
            for (int i3 = concurrentConsumerNum; i3 < 0; i3++) {
                computeIfAbsent.remove(i2).closeAsync();
                i2--;
            }
        }
    }

    protected Consumer<byte[]> createConsumer(PulsarClient pulsarClient, final String str) {
        Consumer<byte[]> consumer = null;
        if (pulsarClient != null && StringUtils.isNotEmpty(str)) {
            LOG.info("createConsumer has topic {}, subName {}", str, this.mqConfig.getPulsarConsumerSubName());
            try {
                consumer = pulsarClient.newConsumer().subscriptionName(this.mqConfig.getPulsarConsumerSubName()).subscriptionType(SubscriptionType.Shared).topic(new String[]{str}).receiverQueueSize(this.mqConfig.getConsumerReceiveQueueSize()).enableRetry(this.mqConfig.isPulsarConsumerEnableRetry()).messageListener(new MessageListener<byte[]>() { // from class: org.apache.inlong.audit.service.consume.PulsarConsume.1
                    public void received(Consumer<byte[]> consumer2, Message<byte[]> message) {
                        try {
                            PulsarConsume.this.handleMessage(new String(message.getData(), StandardCharsets.UTF_8), consumer2, message.getMessageId());
                        } catch (Exception e) {
                            PulsarConsume.LOG.error("Consumer has exception topic {}, subName {}, ex {}", new Object[]{str, PulsarConsume.this.mqConfig.getPulsarConsumerSubName(), e});
                            if (!PulsarConsume.this.mqConfig.isPulsarConsumerEnableRetry()) {
                                consumer2.negativeAcknowledge(message);
                                return;
                            }
                            try {
                                consumer2.reconsumeLater(message, 10L, TimeUnit.SECONDS);
                            } catch (PulsarClientException e2) {
                                PulsarConsume.LOG.error("Consumer reconsumeLater has exception topic {}, subName {}, ex {}", new Object[]{str, PulsarConsume.this.mqConfig.getPulsarConsumerSubName(), e2});
                            }
                        }
                    }
                }).subscribe();
            } catch (PulsarClientException e) {
                LOG.error("createConsumer has topic {}, subName {}, err {}", new Object[]{str, this.mqConfig.getPulsarConsumerSubName(), e});
            }
        }
        return consumer;
    }
}
