package org.apache.hudi.utilities.callback.kafka;

import java.util.Properties;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.class */
public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback {
    private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitKafkaCallback.class);
    private HoodieConfig hoodieConfig;
    private String bootstrapServers;
    private String topic;

    /* loaded from: input_file:org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback$ProducerSendCallback.class */
    private static class ProducerSendCallback implements Callback {
        private ProducerSendCallback() {
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            HoodieWriteCommitKafkaCallback.LOG.info(String.format("message offset=%s partition=%s timestamp=%s topic=%s", Long.valueOf(recordMetadata.offset()), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.timestamp()), recordMetadata.topic()));
        }
    }

    public HoodieWriteCommitKafkaCallback(HoodieWriteConfig hoodieWriteConfig) {
        this.hoodieConfig = hoodieWriteConfig;
        this.bootstrapServers = hoodieWriteConfig.getString(HoodieWriteCommitKafkaCallbackConfig.BOOTSTRAP_SERVERS);
        this.topic = hoodieWriteConfig.getString(HoodieWriteCommitKafkaCallbackConfig.TOPIC);
        validateKafkaConfig();
    }

    @Override // org.apache.hudi.callback.HoodieWriteCommitCallback
    public void call(HoodieWriteCommitCallbackMessage hoodieWriteCommitCallbackMessage) {
        String convertToJsonString = HoodieWriteCommitCallbackUtil.convertToJsonString(hoodieWriteCommitCallbackMessage);
        try {
            KafkaProducer<String, String> createProducer = createProducer(this.hoodieConfig);
            Throwable th = null;
            try {
                try {
                    createProducer.send(buildProducerRecord(this.hoodieConfig, convertToJsonString));
                    LOG.info("Send callback message succeed");
                    if (createProducer != null) {
                        if (0 != 0) {
                            try {
                                createProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createProducer.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Send kafka callback msg failed : ", e);
        }
    }

    public KafkaProducer<String, String> createProducer(HoodieConfig hoodieConfig) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        properties.setProperty("acks", hoodieConfig.getString(HoodieWriteCommitKafkaCallbackConfig.ACKS));
        properties.setProperty("retries", hoodieConfig.getString(HoodieWriteCommitKafkaCallbackConfig.RETRIES));
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        LOG.debug("Callback kafka producer init with configs: " + HoodieWriteCommitCallbackUtil.convertToJsonString(properties));
        return new KafkaProducer<>(properties);
    }

    private ProducerRecord<String, String> buildProducerRecord(HoodieConfig hoodieConfig, String str) {
        String string = hoodieConfig.getString(HoodieWriteCommitKafkaCallbackConfig.PARTITION);
        return null != string ? new ProducerRecord<>(this.topic, Integer.valueOf(string), hoodieConfig.getString(HoodieWriteConfig.TBL_NAME), str) : new ProducerRecord<>(this.topic, hoodieConfig.getString(HoodieWriteConfig.TBL_NAME), str);
    }

    private void validateKafkaConfig() {
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.bootstrapServers), String.format("Config %s can not be null or empty", HoodieWriteCommitKafkaCallbackConfig.BOOTSTRAP_SERVERS.key()));
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.topic), String.format("Config %s can not be null or empty", HoodieWriteCommitKafkaCallbackConfig.TOPIC.key()));
    }
}
