package org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone;

import java.util.Map;
import java.util.Properties;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mqzone/impl/kafkazone/KafkaClusterProducer.class */
public class KafkaClusterProducer extends AbstractZoneClusterProducer {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaClusterProducer.class);
    private KafkaProducer<String, byte[]> producer;

    public KafkaClusterProducer(String str, CacheClusterConfig cacheClusterConfig, KafkaZoneSinkContext kafkaZoneSinkContext) {
        super(str, cacheClusterConfig, kafkaZoneSinkContext);
    }

    public void start() {
        this.state = LifecycleState.START;
        try {
            Properties properties = new Properties();
            properties.putAll(this.producerContext.getParameters());
            properties.putAll(this.config.getParams());
            LOG.info("try to create kafka client:{}", properties);
            this.producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer());
            LOG.info("create new producer success:{}", this.producer);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public void stop() {
        this.state = LifecycleState.STOP;
        this.producer.close();
    }

    @Override // org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneClusterProducer
    public boolean send(final DispatchProfile dispatchProfile) {
        try {
            final String topic = this.sinkContext.getIdTopicHolder().getTopic(dispatchProfile.getUid());
            if (topic == null) {
                this.sinkContext.addSendResultMetric(dispatchProfile, dispatchProfile.getUid(), false, 0L);
                return false;
            }
            if (this.producer == null) {
                this.sinkContext.processSendFail(dispatchProfile, topic, 0L);
                return false;
            }
            Map<String, String> encodeCacheMessageHeaders = encodeCacheMessageHeaders(dispatchProfile);
            byte[] encodeCacheMessageBody = EventUtils.encodeCacheMessageBody(this.sinkContext.getCompressType(), dispatchProfile.getEvents());
            final long currentTimeMillis = System.currentTimeMillis();
            ProducerRecord producerRecord = new ProducerRecord(topic, encodeCacheMessageBody);
            encodeCacheMessageHeaders.forEach((str, str2) -> {
                producerRecord.headers().add(str, str2.getBytes());
            });
            this.producer.send(producerRecord, new Callback() { // from class: org.apache.inlong.dataproxy.sink.mqzone.impl.kafkazone.KafkaClusterProducer.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc == null) {
                        KafkaClusterProducer.this.sinkContext.addSendResultMetric(dispatchProfile, topic, true, currentTimeMillis);
                        dispatchProfile.ack();
                        return;
                    }
                    KafkaClusterProducer.LOG.error("Send fail:{}", exc.getMessage());
                    KafkaClusterProducer.LOG.error(exc.getMessage(), exc);
                    if (dispatchProfile.isResend()) {
                        KafkaClusterProducer.this.sinkContext.processSendFail(dispatchProfile, topic, currentTimeMillis);
                    } else {
                        dispatchProfile.fail();
                    }
                }
            });
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            this.sinkContext.processSendFail(dispatchProfile, dispatchProfile.getUid(), 0L);
            return false;
        }
    }
}
