package org.apache.inlong.sort.standalone.sink.kafka;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Transaction;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.class */
public class KafkaProducerFederation implements Runnable {
    private static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerFederation.class);
    private static final int CORE_POOL_SIZE = 1;
    private final String workerName;
    private final KafkaFederationSinkContext context;
    private ScheduledExecutorService pool;
    private long reloadInterval;
    private List<KafkaProducerCluster> clusterList = new ArrayList();
    private List<KafkaProducerCluster> deletingClusterList = new ArrayList();
    private AtomicInteger clusterIndex = new AtomicInteger(0);

    public KafkaProducerFederation(String str, KafkaFederationSinkContext kafkaFederationSinkContext) {
        this.workerName = (String) Preconditions.checkNotNull(str);
        this.context = (KafkaFederationSinkContext) Preconditions.checkNotNull(kafkaFederationSinkContext);
        this.reloadInterval = kafkaFederationSinkContext.getReloadInterval();
    }

    public void close() {
        try {
            this.pool.shutdownNow();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        Iterator<KafkaProducerCluster> it = this.clusterList.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    public void start() {
        try {
            reload();
            initReloadExecutor();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        reload();
    }

    private void reload() {
        try {
            LOG.info("stop deleting clusters, size is {}", Integer.valueOf(this.deletingClusterList.size()));
            this.deletingClusterList.forEach((v0) -> {
                v0.stop();
            });
            this.deletingClusterList.clear();
            LOG.info("update cluster list");
            List<CacheClusterConfig> clusterConfigList = this.context.getClusterConfigList();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            clusterConfigList.forEach(cacheClusterConfig -> {
                hashSet.add(cacheClusterConfig.getClusterName());
            });
            this.clusterList.forEach(kafkaProducerCluster -> {
                hashSet2.add(kafkaProducerCluster.getCacheClusterName());
            });
            ArrayList arrayList = new ArrayList(clusterConfigList.size());
            clusterConfigList.forEach(cacheClusterConfig2 -> {
                if (hashSet2.contains(cacheClusterConfig2.getClusterName())) {
                    return;
                }
                KafkaProducerCluster kafkaProducerCluster2 = new KafkaProducerCluster(this.workerName, cacheClusterConfig2, this.context);
                kafkaProducerCluster2.start();
                arrayList.add(kafkaProducerCluster2);
            });
            this.clusterList.forEach(kafkaProducerCluster2 -> {
                if (hashSet.contains(kafkaProducerCluster2.getCacheClusterName())) {
                    arrayList.add(kafkaProducerCluster2);
                } else {
                    this.deletingClusterList.add(kafkaProducerCluster2);
                }
            });
            LOG.info("the modified cluster list size is {}", Integer.valueOf(arrayList.size()));
            this.clusterList = arrayList;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public boolean send(ProfileEvent profileEvent, Transaction transaction) throws IOException {
        int andIncrement = this.clusterIndex.getAndIncrement();
        if (andIncrement > 1073741823) {
            this.clusterIndex.set(0);
        }
        List<KafkaProducerCluster> list = this.clusterList;
        return list.get(andIncrement % list.size()).send(profileEvent, transaction);
    }

    private void initReloadExecutor() {
        this.pool = Executors.newScheduledThreadPool(1);
        this.pool.scheduleAtFixedRate(this, this.reloadInterval, this.reloadInterval, TimeUnit.SECONDS);
    }
}
