package org.apache.inlong.dataproxy.sink.mq.kafka;

import java.util.Map;
import java.util.Properties;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.OrderBatchPackProfileV0;
import org.apache.inlong.dataproxy.sink.mq.SimpleBatchPackProfileV0;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.class */
public class KafkaHandler implements MessageQueueHandler {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaHandler.class);
    public static final String KEY_NAMESPACE = "namespace";
    private CacheClusterConfig config;
    private MessageQueueZoneSinkContext sinkContext;
    private KafkaProducer<String, byte[]> producer;
    private EventHandler handler;

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void init(CacheClusterConfig cacheClusterConfig, MessageQueueZoneSinkContext messageQueueZoneSinkContext) {
        this.config = cacheClusterConfig;
        this.sinkContext = messageQueueZoneSinkContext;
        this.handler = this.sinkContext.createEventHandler();
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void start() {
        try {
            Properties properties = new Properties();
            properties.putAll(this.sinkContext.getProducerContext().getParameters());
            properties.putAll(this.config.getParams());
            LOG.info("try to create kafka client:{}", properties);
            this.producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer());
            LOG.info("create new producer success:{}", this.producer);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public void stop() {
        this.producer.close();
        LOG.info("kafka handler stopped");
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler
    public boolean send(BatchPackProfile batchPackProfile) {
        try {
            IdTopicConfig idConfig = this.sinkContext.getIdTopicHolder().getIdConfig(batchPackProfile.getUid());
            if (idConfig == null) {
                this.sinkContext.addSendResultMetric(batchPackProfile, batchPackProfile.getUid(), false, 0L);
                this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                return false;
            }
            String topicName = idConfig.getTopicName();
            if (topicName == null) {
                this.sinkContext.addSendResultMetric(batchPackProfile, batchPackProfile.getUid(), false, 0L);
                this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                return false;
            }
            String producerTopic = getProducerTopic(topicName, idConfig);
            this.sinkContext.addSendMetric(batchPackProfile, producerTopic);
            if (this.producer == null) {
                this.sinkContext.processSendFail(batchPackProfile, producerTopic, 0L);
                return false;
            }
            if (batchPackProfile instanceof SimpleBatchPackProfileV0) {
                sendSimpleProfileV0((SimpleBatchPackProfileV0) batchPackProfile, idConfig, producerTopic);
                return true;
            }
            if (batchPackProfile instanceof OrderBatchPackProfileV0) {
                sendOrderProfileV0((OrderBatchPackProfileV0) batchPackProfile, idConfig, producerTopic);
                return true;
            }
            sendProfileV1(batchPackProfile, idConfig, producerTopic);
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            this.sinkContext.processSendFail(batchPackProfile, batchPackProfile.getUid(), 0L);
            return false;
        }
    }

    private String getProducerTopic(String str, IdTopicConfig idTopicConfig) {
        String str2 = idTopicConfig.getParams().get("namespace");
        return StringUtils.isNotEmpty(str2) ? String.format("%s.%s", str2, str) : str;
    }

    private void sendProfileV1(final BatchPackProfile batchPackProfile, IdTopicConfig idTopicConfig, final String str) throws Exception {
        Map<String, String> parseHeader = this.handler.parseHeader(idTopicConfig, batchPackProfile, this.sinkContext.getNodeId(), this.sinkContext.getCompressType());
        byte[] parseBody = this.handler.parseBody(idTopicConfig, batchPackProfile, this.sinkContext.getCompressType());
        final long currentTimeMillis = System.currentTimeMillis();
        ProducerRecord producerRecord = new ProducerRecord(str, parseBody);
        parseHeader.forEach((str2, str3) -> {
            producerRecord.headers().add(str2, str3.getBytes());
        });
        this.producer.send(producerRecord, new Callback() { // from class: org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    KafkaHandler.this.sinkContext.addSendResultMetric(batchPackProfile, str, true, currentTimeMillis);
                    KafkaHandler.this.sinkContext.getDispatchQueue().release(batchPackProfile.getSize());
                    batchPackProfile.ack();
                } else {
                    KafkaHandler.LOG.error("Send fail:{}", exc.getMessage());
                    KafkaHandler.LOG.error(exc.getMessage(), exc);
                    if (batchPackProfile.isResend()) {
                        KafkaHandler.this.sinkContext.processSendFail(batchPackProfile, str, currentTimeMillis);
                    } else {
                        batchPackProfile.fail();
                    }
                }
            }
        });
    }

    private void sendSimpleProfileV0(final SimpleBatchPackProfileV0 simpleBatchPackProfileV0, IdTopicConfig idTopicConfig, final String str) throws Exception {
        Map<String, String> properties = simpleBatchPackProfileV0.getProperties();
        if (MapUtils.isEmpty(properties)) {
            properties = simpleBatchPackProfileV0.getSimpleProfile().getHeaders();
        }
        byte[] body = simpleBatchPackProfileV0.getSimpleProfile().getBody();
        final long currentTimeMillis = System.currentTimeMillis();
        ProducerRecord producerRecord = new ProducerRecord(str, body);
        properties.forEach((str2, str3) -> {
            producerRecord.headers().add(str2, str3.getBytes());
        });
        this.producer.send(producerRecord, new Callback() { // from class: org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler.2
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    KafkaHandler.this.sinkContext.addSendResultMetric(simpleBatchPackProfileV0, str, true, currentTimeMillis);
                    KafkaHandler.this.sinkContext.getDispatchQueue().release(simpleBatchPackProfileV0.getSize());
                    simpleBatchPackProfileV0.ack();
                } else {
                    KafkaHandler.LOG.error("Send fail:{}", exc.getMessage());
                    KafkaHandler.LOG.error(exc.getMessage(), exc);
                    if (simpleBatchPackProfileV0.isResend()) {
                        KafkaHandler.this.sinkContext.processSendFail(simpleBatchPackProfileV0, str, currentTimeMillis);
                    } else {
                        simpleBatchPackProfileV0.fail();
                    }
                }
            }
        });
    }

    private void sendOrderProfileV0(final OrderBatchPackProfileV0 orderBatchPackProfileV0, IdTopicConfig idTopicConfig, final String str) throws Exception {
        Map<String, String> headers = orderBatchPackProfileV0.getOrderProfile().getHeaders();
        byte[] body = orderBatchPackProfileV0.getOrderProfile().getBody();
        final long currentTimeMillis = System.currentTimeMillis();
        ProducerRecord producerRecord = new ProducerRecord(str, body);
        headers.forEach((str2, str3) -> {
            producerRecord.headers().add(str2, str3.getBytes());
        });
        this.producer.send(producerRecord, new Callback() { // from class: org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler.3
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    KafkaHandler.this.sinkContext.addSendResultMetric(orderBatchPackProfileV0, str, true, currentTimeMillis);
                    KafkaHandler.this.sinkContext.getDispatchQueue().release(orderBatchPackProfileV0.getSize());
                    orderBatchPackProfileV0.ack();
                } else {
                    KafkaHandler.LOG.error("Send fail:{}", exc.getMessage());
                    KafkaHandler.LOG.error(exc.getMessage(), exc);
                    if (orderBatchPackProfileV0.isResend()) {
                        KafkaHandler.this.sinkContext.processSendFail(orderBatchPackProfileV0, str, currentTimeMillis);
                    } else {
                        orderBatchPackProfileV0.fail();
                    }
                }
            }
        });
    }
}
