package org.apache.inlong.sdk.sort.impl;

import java.util.Base64;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/impl/PulsarMessageListener.class */
public class PulsarMessageListener implements MessageListener<byte[]> {
    private final Logger logger = LoggerFactory.getLogger(PulsarMessageListener.class);
    private final ClientContext clientContext;
    private final InLongTopic inLongTopic;
    private final InLongPulsarFetcherImpl inLongTopicInFetcher;
    private final ConcurrentHashMap<String, MessageId> offsetCache;

    public PulsarMessageListener(InLongPulsarFetcherImpl inLongPulsarFetcherImpl, ClientContext clientContext, InLongTopic inLongTopic, ConcurrentHashMap<String, MessageId> concurrentHashMap) {
        this.inLongTopicInFetcher = inLongPulsarFetcherImpl;
        this.clientContext = clientContext;
        this.inLongTopic = inLongTopic;
        this.offsetCache = concurrentHashMap;
    }

    public void handleMsg(MessageRecord messageRecord) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.clientContext.getConfig().getCallback().onFinished(messageRecord);
            this.clientContext.getStatManager().getStatistics(this.clientContext.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addCallbackTimeCost(System.currentTimeMillis() - currentTimeMillis).addCallbackDoneTimes(1);
        } catch (Exception e) {
            this.clientContext.getStatManager().getStatistics(this.clientContext.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addCallbackErrorTimes(1);
            throw e;
        }
    }

    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        try {
            this.inLongTopicInFetcher.isValidState();
            this.clientContext.getStatManager().getStatistics(this.clientContext.getConfig().getSortTaskId(), this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic()).addConsumeSize(message.getData().length).addCallbackTimes(1).addMsgCount(1);
            String offset = getOffset(message.getMessageId());
            this.offsetCache.put(offset, message.getMessageId());
            callbackMessageRecord(message, offset);
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
        }
    }

    private void callbackMessageRecord(Message<byte[]> message, String str) {
        handleMsg(new MessageRecord(this.inLongTopic.getTopicKey(), message.getData(), message.getProperties(), str, System.currentTimeMillis()));
    }

    private String getOffset(MessageId messageId) {
        return Base64.getEncoder().encodeToString(messageId.toByteArray());
    }
}
