package org.apache.inlong.sort.standalone.sink.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.class */
public class KafkaProducerCluster implements LifecycleAware {
    public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerCluster.class);
    private final String workerName;
    protected final KafkaNodeConfig nodeConfig;
    protected final CacheClusterConfig cacheClusterConfig;
    private final KafkaFederationSinkContext sinkContext;
    private LifecycleState state = LifecycleState.IDLE;
    private IEvent2KafkaRecordHandler handler;
    private KafkaProducer<String, byte[]> producer;

    public KafkaProducerCluster(String str, CacheClusterConfig cacheClusterConfig, KafkaNodeConfig kafkaNodeConfig, KafkaFederationSinkContext kafkaFederationSinkContext) {
        this.workerName = (String) Preconditions.checkNotNull(str);
        this.nodeConfig = kafkaNodeConfig;
        this.cacheClusterConfig = cacheClusterConfig;
        this.sinkContext = (KafkaFederationSinkContext) Preconditions.checkNotNull(kafkaFederationSinkContext);
        this.handler = this.sinkContext.createEventHandler();
    }

    public void start() {
        if (CommonPropertiesHolder.useUnifiedConfiguration()) {
            startByNodeConfig();
        } else {
            startByCacheCluster();
        }
    }

    private void startByCacheCluster() {
        this.state = LifecycleState.START;
        if (this.cacheClusterConfig == null) {
            LOG.error("start kafka producer cluster failed, cacheClusterConfig config is null");
            return;
        }
        try {
            Properties defaultKafkaProperties = defaultKafkaProperties();
            defaultKafkaProperties.putAll(this.cacheClusterConfig.getParams());
            defaultKafkaProperties.put("partitioner.class", PartitionerSelector.class.getName());
            defaultKafkaProperties.put("acks", this.cacheClusterConfig.getParams().getOrDefault("acks", "all"));
            defaultKafkaProperties.put("bootstrap.servers", this.cacheClusterConfig.getParams().get("bootstrap.servers"));
            defaultKafkaProperties.put("client.id", ((String) this.cacheClusterConfig.getParams().get("client.id")) + "-" + this.workerName);
            LOG.info("init kafka client by cache cluster info: " + defaultKafkaProperties);
            this.producer = new KafkaProducer<>(defaultKafkaProperties, new StringSerializer(), new ByteArraySerializer());
            Preconditions.checkNotNull(this.producer);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private void startByNodeConfig() {
        this.state = LifecycleState.START;
        if (this.nodeConfig == null) {
            LOG.error("start kafka producer cluster failed, node config is null");
            return;
        }
        try {
            Properties defaultKafkaProperties = defaultKafkaProperties();
            defaultKafkaProperties.putAll(this.nodeConfig.getProperties() == null ? new HashMap() : this.nodeConfig.getProperties());
            defaultKafkaProperties.put("partitioner.class", PartitionerSelector.class.getName());
            defaultKafkaProperties.put("acks", this.nodeConfig.getAcks());
            defaultKafkaProperties.put("bootstrap.servers", this.nodeConfig.getBootstrapServers());
            defaultKafkaProperties.put("client.id", this.nodeConfig.getClientId() + "-" + this.workerName);
            LOG.info("init kafka client by node config info: " + defaultKafkaProperties);
            this.producer = new KafkaProducer<>(defaultKafkaProperties, new StringSerializer(), new ByteArraySerializer());
            Preconditions.checkNotNull(this.producer);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public Properties defaultKafkaProperties() {
        Properties properties = new Properties();
        properties.put("enable.idempotence", "false");
        properties.put("batch.size", "122880");
        properties.put("buffer.memory", "44740000");
        properties.put("compression.type", "gzip");
        properties.put("delivery.timeout.ms", "86400000");
        properties.put("linger.ms", "500");
        properties.put("max.in.flight.requests.per.connection", "5");
        properties.put("max.request.size", "8388608");
        properties.put("metadata.max.age.ms", "300000");
        properties.put("receive.buffer.bytes", "32768");
        properties.put("request.timeout.ms", "30000");
        properties.put("retries", "100000");
        properties.put("send.buffer.bytes", "524288");
        properties.put("mute.partition.error.max.times", "20");
        properties.put("mute.partition.max.percentage", "20");
        properties.put("rpc.timeout.ms", "30000");
        properties.put("topic.expiry.ms", "86400000");
        properties.put("unmute.partition.interval.ms", "600000");
        properties.put("metadata.retry.backoff.ms", "500");
        properties.put("metadata.fetch.timeout.ms", "1000");
        properties.put(SinkContext.KEY_MAX_THREADS, "2");
        properties.put("enable.replace.partition.for.can.retry", "true");
        properties.put("enable.replace.partition.for.not.leader", "true");
        properties.put("enable.topic.partition.circuit.breaker", "true");
        return properties;
    }

    public void stop() {
        this.state = LifecycleState.STOP;
        try {
            LOG.info("stop kafka producer");
            this.producer.close();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public LifecycleState getLifecycleState() {
        return this.state;
    }

    public boolean send(ProfileEvent profileEvent, Transaction transaction) throws IOException {
        String str = (String) profileEvent.getHeaders().get("topic");
        ProducerRecord<String, byte[]> parse = this.handler.parse(this.sinkContext, profileEvent);
        long currentTimeMillis = System.currentTimeMillis();
        if (parse == null) {
            transaction.commit();
            profileEvent.ack();
            transaction.close();
            this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
            return true;
        }
        try {
            this.producer.send(parse, (recordMetadata, exc) -> {
                if (exc == null) {
                    transaction.commit();
                    this.sinkContext.addSendResultMetric(profileEvent, str, true, currentTimeMillis);
                    profileEvent.ack();
                } else {
                    LOG.error(String.format("send failed, topic is %s, partition is %s", recordMetadata.topic(), Integer.valueOf(recordMetadata.partition())), exc);
                    transaction.rollback();
                    this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
                }
                transaction.close();
            });
            return true;
        } catch (Exception e) {
            transaction.rollback();
            transaction.close();
            LOG.error(e.getMessage(), e);
            this.sinkContext.addSendResultMetric(profileEvent, str, false, currentTimeMillis);
            return false;
        }
    }
}
