package org.apache.inlong.sort.standalone.sink.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Properties;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.class */
public class KafkaProducerCluster implements LifecycleAware {
    public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerCluster.class);
    private final String workerName;
    protected final CacheClusterConfig config;
    private final KafkaFederationSinkContext sinkContext;
    private final Context context;
    private final String cacheClusterName;
    private LifecycleState state = LifecycleState.IDLE;
    private IEvent2KafkaRecordHandler handler;
    private KafkaProducer<String, byte[]> producer;

    public KafkaProducerCluster(String str, CacheClusterConfig cacheClusterConfig, KafkaFederationSinkContext kafkaFederationSinkContext) {
        this.workerName = (String) Preconditions.checkNotNull(str);
        this.config = (CacheClusterConfig) Preconditions.checkNotNull(cacheClusterConfig);
        this.sinkContext = (KafkaFederationSinkContext) Preconditions.checkNotNull(kafkaFederationSinkContext);
        this.context = (Context) Preconditions.checkNotNull(kafkaFederationSinkContext.getProducerContext());
        this.cacheClusterName = (String) Preconditions.checkNotNull(cacheClusterConfig.getClusterName());
        this.handler = this.sinkContext.createEventHandler();
    }

    public void start() {
        this.state = LifecycleState.START;
        try {
            Properties properties = new Properties();
            properties.putAll(this.context.getParameters());
            properties.put("partitioner.class", this.context.getString("partitioner.class", PartitionerSelector.class.getName()));
            properties.put("bootstrap.servers", this.context.getString("bootstrap.servers"));
            properties.put("client.id", this.context.getString("client.id", this.cacheClusterName) + "-" + this.workerName);
            LOG.info("init kafka client info: " + properties);
            this.producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer());
            Preconditions.checkNotNull(this.producer);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void stop() {
        this.state = LifecycleState.STOP;
        try {
            LOG.info("stop kafka producer");
            this.producer.close();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public LifecycleState getLifecycleState() {
        return this.state;
    }

    public boolean send(ProfileEvent profileEvent, Transaction transaction) throws IOException {
        String str = (String) profileEvent.getHeaders().get("topic");
        ProducerRecord<String, byte[]> parse = this.handler.parse(this.sinkContext, profileEvent);
        long currentTimeMillis = System.currentTimeMillis();
        if (parse == null) {
            transaction.commit();
            profileEvent.ack();
            transaction.close();
            this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
            return true;
        }
        try {
            this.producer.send(parse, (recordMetadata, exc) -> {
                if (exc == null) {
                    transaction.commit();
                    this.sinkContext.addSendResultMetric(profileEvent, str, true, currentTimeMillis);
                    profileEvent.ack();
                } else {
                    LOG.error(String.format("send failed, topic is %s, partition is %s", recordMetadata.topic(), Integer.valueOf(recordMetadata.partition())), exc);
                    transaction.rollback();
                    this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
                }
                transaction.close();
            });
            return true;
        } catch (Exception e) {
            transaction.rollback();
            transaction.close();
            LOG.error(e.getMessage(), e);
            this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
            return false;
        }
    }

    public String getCacheClusterName() {
        return this.cacheClusterName;
    }
}
