package org.apache.inlong.sort.standalone.sink.pulsar;

import java.io.IOException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.flume.Transaction;
import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.class */
public class PulsarProducerFederation {
    public static final Logger LOG = InlongLoggerFactory.getLogger(PulsarProducerFederation.class);
    private final String workerName;
    private final PulsarFederationSinkContext context;
    private Timer reloadTimer;
    private PulsarNodeConfig nodeConfig;
    private CacheClusterConfig cacheClusterConfig;
    private PulsarProducerCluster cluster;
    private PulsarProducerCluster deleteCluster;

    public PulsarProducerFederation(String str, PulsarFederationSinkContext pulsarFederationSinkContext) {
        this.workerName = str;
        this.context = pulsarFederationSinkContext;
    }

    public void start() {
        try {
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void close() {
        try {
            this.reloadTimer.cancel();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        this.cluster.stop();
    }

    private void setReloadTimer() {
        this.reloadTimer = new Timer(true);
        this.reloadTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sort.standalone.sink.pulsar.PulsarProducerFederation.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                PulsarProducerFederation.this.reload();
            }
        }, new Date(System.currentTimeMillis() + this.context.getReloadInterval()), this.context.getReloadInterval());
    }

    public void reload() {
        try {
            if (this.deleteCluster != null) {
                this.deleteCluster.stop();
                this.deleteCluster = null;
            }
        } catch (Exception e) {
            LOG.error("failed to close delete cluster, ex={}", e.getMessage(), e);
        }
        if (CommonPropertiesHolder.useUnifiedConfiguration()) {
            reloadByNodeConfig();
        } else {
            reloadByCacheClusterConfig();
        }
    }

    private void reloadByNodeConfig() {
        try {
            if (this.nodeConfig == null || this.context.getNodeConfig().getVersion().intValue() > this.nodeConfig.getVersion().intValue()) {
                this.nodeConfig = this.context.getNodeConfig();
                PulsarProducerCluster pulsarProducerCluster = new PulsarProducerCluster(this.workerName, this.cacheClusterConfig, this.nodeConfig, this.context);
                pulsarProducerCluster.start();
                this.deleteCluster = this.cluster;
                this.cluster = pulsarProducerCluster;
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    private void reloadByCacheClusterConfig() {
        try {
            if (this.cacheClusterConfig == null || this.cacheClusterConfig.equals(this.context.getCacheClusterConfig())) {
                this.cacheClusterConfig = this.context.getCacheClusterConfig();
                PulsarProducerCluster pulsarProducerCluster = new PulsarProducerCluster(this.workerName, this.cacheClusterConfig, this.nodeConfig, this.context);
                pulsarProducerCluster.start();
                this.deleteCluster = this.cluster;
                this.cluster = pulsarProducerCluster;
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public boolean send(ProfileEvent profileEvent, Transaction transaction) throws IOException {
        return this.cluster.send(profileEvent, transaction);
    }
}
