package org.apache.inlong.dataproxy.sink.mq.kafka;

import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.class */
public class KafkaHandler implements MessageQueueHandler {
    private static final Logger logger = LoggerFactory.getLogger(KafkaHandler.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private CacheClusterConfig config;
    private String clusterName;
    private MessageQueueZoneSinkContext sinkContext;
    private KafkaProducer<String, byte[]> producer;
    private final ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void init(CacheClusterConfig cacheClusterConfig, MessageQueueZoneSinkContext messageQueueZoneSinkContext) {
        this.config = cacheClusterConfig;
        this.clusterName = cacheClusterConfig.getClusterName();
        this.sinkContext = messageQueueZoneSinkContext;
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void start() {
        try {
            Properties properties = new Properties();
            properties.putAll(this.sinkContext.getProducerContext().getParameters());
            properties.putAll(this.config.getParams());
            logger.info("try to create kafka client:{}", properties);
            this.producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer());
            logger.info("create new producer success:{}", this.producer);
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void publishTopic(Set<String> set) {
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void stop() {
        this.producer.close();
        logger.info("kafka handler stopped");
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public boolean send(PackProfile packProfile) {
        String topicName;
        try {
            IdTopicConfig sinkIdTopicConfig = ConfigManager.getInstance().getSinkIdTopicConfig(packProfile.getInlongGroupId(), packProfile.getInlongStreamId());
            if (sinkIdTopicConfig != null) {
                topicName = sinkIdTopicConfig.getTopicName();
            } else {
                if (!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
                    this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, packProfile.getUid());
                    this.sinkContext.addSendResultMetric(packProfile, this.clusterName, packProfile.getUid(), false, 0L);
                    this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(packProfile);
                    packProfile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                    return false;
                }
                topicName = CommonConfigHolder.getInstance().getRandDefTopics();
                if (StringUtils.isEmpty(topicName)) {
                    this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, packProfile.getUid());
                    this.sinkContext.addSendResultMetric(packProfile, this.clusterName, packProfile.getUid(), false, 0L);
                    this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(packProfile);
                    packProfile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                    return false;
                }
                this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED, packProfile.getUid());
            }
            if (this.producer == null) {
                this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_PRODUCER_NULL, topicName);
                this.sinkContext.processSendFail(packProfile, this.clusterName, topicName, 0L, DataProxyErrCode.PRODUCER_IS_NULL, "");
                return false;
            }
            if (packProfile instanceof SimplePackProfile) {
                sendSimplePackProfile((SimplePackProfile) packProfile, sinkIdTopicConfig, topicName);
                return true;
            }
            sendBatchPackProfile((BatchPackProfile) packProfile, sinkIdTopicConfig, topicName);
            return true;
        } catch (Exception e) {
            this.sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION, null);
            this.sinkContext.processSendFail(packProfile, this.clusterName, packProfile.getUid(), 0L, DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, e.getMessage());
            if (!logCounter.shouldPrint()) {
                return false;
            }
            logger.error("Send Message to Kafka failure", e);
            return false;
        }
    }

    private void sendBatchPackProfile(final BatchPackProfile batchPackProfile, IdTopicConfig idTopicConfig, final String str) throws Exception {
        EventHandler eventHandler = this.handlerLocal.get();
        if (eventHandler == null) {
            eventHandler = this.sinkContext.createEventHandler();
            this.handlerLocal.set(eventHandler);
        }
        Map<String, String> parseHeader = eventHandler.parseHeader(idTopicConfig, batchPackProfile, this.sinkContext.getNodeId(), this.sinkContext.getCompressType());
        byte[] parseBody = eventHandler.parseBody(idTopicConfig, batchPackProfile, this.sinkContext.getCompressType());
        this.sinkContext.addSendMetric(batchPackProfile, this.clusterName, str, parseBody.length);
        final long currentTimeMillis = System.currentTimeMillis();
        ProducerRecord producerRecord = new ProducerRecord(str, parseBody);
        parseHeader.forEach((str2, str3) -> {
            producerRecord.headers().add(str2, str3.getBytes());
        });
        this.producer.send(producerRecord, new Callback() { // from class: org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    KafkaHandler.this.sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                    KafkaHandler.this.sinkContext.addSendResultMetric(batchPackProfile, KafkaHandler.this.clusterName, str, true, currentTimeMillis);
                    KafkaHandler.this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchPackProfile);
                    batchPackProfile.ack();
                    return;
                }
                KafkaHandler.this.sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
                KafkaHandler.this.sinkContext.processSendFail(batchPackProfile, KafkaHandler.this.clusterName, str, currentTimeMillis, DataProxyErrCode.MQ_RETURN_ERROR, exc.getMessage());
                if (KafkaHandler.logCounter.shouldPrint()) {
                    KafkaHandler.logger.error("Send BatchPackProfile to Kafka failure", exc);
                }
            }
        });
    }

    private void sendSimplePackProfile(final SimplePackProfile simplePackProfile, IdTopicConfig idTopicConfig, final String str) throws Exception {
        this.sinkContext.addSendMetric(simplePackProfile, this.clusterName, str, simplePackProfile.getEvent().getBody().length);
        ProducerRecord producerRecord = new ProducerRecord(str, simplePackProfile.getEvent().getBody());
        final long currentTimeMillis = System.currentTimeMillis();
        simplePackProfile.getPropsToMQ(currentTimeMillis).forEach((str2, str3) -> {
            producerRecord.headers().add(str2, str3.getBytes());
        });
        this.producer.send(producerRecord, new Callback() { // from class: org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler.2
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    KafkaHandler.this.sinkContext.fileMetricAddSuccStats(simplePackProfile, str, recordMetadata == null ? "" : String.valueOf(recordMetadata.partition()));
                    KafkaHandler.this.sinkContext.addSendResultMetric(simplePackProfile, KafkaHandler.this.clusterName, str, true, currentTimeMillis);
                    KafkaHandler.this.sinkContext.getMqZoneSink().releaseAcquiredSizePermit(simplePackProfile);
                    simplePackProfile.ack();
                    return;
                }
                KafkaHandler.this.sinkContext.fileMetricAddFailStats(simplePackProfile, str, recordMetadata == null ? "" : String.valueOf(recordMetadata.partition()), str);
                KafkaHandler.this.sinkContext.processSendFail(simplePackProfile, KafkaHandler.this.clusterName, str, currentTimeMillis, DataProxyErrCode.MQ_RETURN_ERROR, exc.getMessage());
                if (KafkaHandler.logCounter.shouldPrint()) {
                    KafkaHandler.logger.error("Send SimplePackProfile to Kafka failure", exc);
                }
            }
        });
    }
}
