package org.apache.inlong.sort.standalone.sink.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Transaction;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.class */
public class KafkaProducerFederation implements Runnable {
    private static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerFederation.class);
    private static final int CORE_POOL_SIZE = 1;
    private final String workerName;
    private final KafkaFederationSinkContext context;
    private ScheduledExecutorService pool;
    private long reloadInterval;
    private KafkaNodeConfig nodeConfig;
    private KafkaProducerCluster cluster;
    private KafkaProducerCluster deleteCluster;
    private CacheClusterConfig cacheClusterConfig;

    public KafkaProducerFederation(String str, KafkaFederationSinkContext kafkaFederationSinkContext) {
        this.workerName = (String) Preconditions.checkNotNull(str);
        this.context = (KafkaFederationSinkContext) Preconditions.checkNotNull(kafkaFederationSinkContext);
        this.reloadInterval = kafkaFederationSinkContext.getReloadInterval();
    }

    public void close() {
        try {
            this.pool.shutdownNow();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        this.cluster.stop();
    }

    public void start() {
        try {
            reload();
            initReloadExecutor();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        reload();
    }

    private void reload() {
        try {
            if (this.deleteCluster != null) {
                this.deleteCluster.stop();
                this.deleteCluster = null;
            }
        } catch (Exception e) {
            LOG.error("failed to close delete cluster, ex={}", e.getMessage(), e);
        }
        if (CommonPropertiesHolder.useUnifiedConfiguration()) {
            reloadByNodeConfig();
        } else {
            reloadByCacheClusterConfig();
        }
    }

    private void reloadByCacheClusterConfig() {
        try {
            if (this.cacheClusterConfig == null || this.cacheClusterConfig.equals(this.context.getCacheClusterConfig())) {
                this.cacheClusterConfig = this.context.getCacheClusterConfig();
                KafkaProducerCluster kafkaProducerCluster = new KafkaProducerCluster(this.workerName, this.cacheClusterConfig, this.nodeConfig, this.context);
                kafkaProducerCluster.start();
                this.deleteCluster = this.cluster;
                this.cluster = kafkaProducerCluster;
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private void reloadByNodeConfig() {
        try {
            if (this.nodeConfig == null || this.context.getNodeConfig().getVersion().intValue() > this.nodeConfig.getVersion().intValue()) {
                this.nodeConfig = this.context.getNodeConfig();
                KafkaProducerCluster kafkaProducerCluster = new KafkaProducerCluster(this.workerName, this.cacheClusterConfig, this.nodeConfig, this.context);
                kafkaProducerCluster.start();
                this.deleteCluster = this.cluster;
                this.cluster = kafkaProducerCluster;
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public boolean send(ProfileEvent profileEvent, Transaction transaction) throws IOException {
        return this.cluster.send(profileEvent, transaction);
    }

    private void initReloadExecutor() {
        this.pool = Executors.newScheduledThreadPool(1);
        this.pool.scheduleAtFixedRate(this, this.reloadInterval, this.reloadInterval, TimeUnit.SECONDS);
    }
}
