package org.apache.inlong.dataproxy.sink;

import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections.SetUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.shaded.guava.RateLimiter;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder;
import org.apache.inlong.dataproxy.sink.common.TubeUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/TubeSink.class */
public class TubeSink extends AbstractSink implements Configurable {
    private ConfigManager configManager;
    private Map<String, String> topicProperties;
    private MQClusterConfig tubeConfig;
    private Set<String> masterHostAndPortLists;
    private MonitorIndex monitorIndex;
    private MonitorIndexExt monitorIndexExt;
    private static final String KEY_SINK_DROPPED = "TUBE_SINK_DROPPED";
    private static final String KEY_SINK_SUCCESS = "TUBE_SINK_SUCCESS";
    private static final String KEY_SINK_FAILURE = "TUBE_SINK_FAILURE";
    private static final String KEY_SINK_EXP = "TUBE_SINK_EXP";
    private RateLimiter diskRateLimiter;
    private Thread[] sinkThreadPool;
    private DataProxyMetricItemSet metricItemSet;
    private LinkedBlockingQueue<Event> eventQueue;
    private LinkedBlockingQueue<EventStat> resendQueue;
    private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
    private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler();
    private static final LogCounter LOG_SINK_TASK_PRINTER = new LogCounter(10, 100000, 60000);
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, new HighPriorityThreadFactory("tubeSink-Printer-thread"));
    private TubeProducerHolder producerHolder = null;
    private volatile boolean canSend = false;
    private volatile boolean isOverFlow = false;
    private String usedMasterAddr = null;
    private int maxMonitorCnt = 300000;
    private int statIntervalSec = 60;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicLong cachedMsgCnt = new AtomicLong(0);
    private final AtomicLong takenMsgCnt = new AtomicLong(0);
    private final AtomicLong resendMsgCnt = new AtomicLong(0);
    private final AtomicLong blankTopicDiscardMsgCnt = new AtomicLong(0);
    private final AtomicLong frozenTopicDiscardMsgCnt = new AtomicLong(0);
    private final AtomicLong dupDiscardMsgCnt = new AtomicLong(0);
    private final AtomicLong inflightMsgCnt = new AtomicLong(0);
    private final AtomicLong successMsgCnt = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/TubeSink$MyCallback.class */
    public class MyCallback implements MessageSentCallback {
        private EventStat myEventStat;
        private long sendTime = System.currentTimeMillis();

        public MyCallback(EventStat eventStat) {
            this.myEventStat = eventStat;
        }

        public void onMessageSent(MessageSentResult messageSentResult) {
            if (messageSentResult.isSuccess()) {
                TubeSink.this.successMsgCnt.incrementAndGet();
                TubeSink.this.inflightMsgCnt.decrementAndGet();
                TubeSink.this.takenMsgCnt.decrementAndGet();
                addStatistics(this.myEventStat.getEvent(), true, false, this.sendTime);
                if (MessageUtils.isSinkRspType(this.myEventStat.getEvent())) {
                    MessageUtils.sinkReturnRspPackage((SinkRspEvent) this.myEventStat.getEvent(), DataProxyErrCode.SUCCESS, "");
                    return;
                }
                return;
            }
            addStatistics(this.myEventStat.getEvent(), false, false, 0L);
            if (messageSentResult.getErrCode() == 403) {
                TubeSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{messageSentResult.getErrMsg(), Integer.valueOf(TubeSink.this.resendQueue.size()), Integer.valueOf(this.myEventStat.getEvent().hashCode())});
                return;
            }
            if (messageSentResult.getErrCode() != 419 && TubeSink.LOG_SINK_TASK_PRINTER.shouldPrint()) {
                TubeSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{messageSentResult.getErrMsg(), Integer.valueOf(TubeSink.this.resendQueue.size()), Integer.valueOf(this.myEventStat.getEvent().hashCode())});
            }
            TubeSink.this.resendEvent(this.myEventStat, true, DataProxyErrCode.MQ_RETURN_ERROR, messageSentResult.getErrCode() + AttrConstants.SEP_HASHTAG + messageSentResult.getErrMsg());
        }

        public void onException(Throwable th) {
            addStatistics(this.myEventStat.getEvent(), false, true, 0L);
            TubeSink.this.resendEvent(this.myEventStat, true, DataProxyErrCode.UNKNOWN_ERROR, th.getMessage());
        }

        private void addStatistics(Event event, boolean z, boolean z2, long j) {
            if (event == null) {
                return;
            }
            TubeSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(event, j, z, event.getBody().length);
            if (z) {
                AuditUtils.add(6, event);
            }
            if (TubeSink.this.statIntervalSec <= 0) {
                return;
            }
            String str = (String) event.getHeaders().get("topic");
            String str2 = (String) event.getHeaders().get("streamId");
            String str3 = (String) event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
            int parseInt = Integer.parseInt((String) event.getHeaders().get("msgcnt"));
            StringBuilder append = new StringBuilder(512).append(TubeSink.this.getName()).append(AttrConstants.SEP_HASHTAG).append(str).append(AttrConstants.SEP_HASHTAG).append(str2).append(AttrConstants.SEP_HASHTAG).append(str3).append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp()).append(AttrConstants.SEP_HASHTAG).append((String) MessageUtils.getEventProcType("", "").getRight()).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(Long.parseLong((String) event.getHeaders().get("dt"))));
            if (z) {
                TubeSink.this.monitorIndex.addAndGet(append.toString(), parseInt, 1, event.getBody().length, 0);
                TubeSink.this.monitorIndexExt.incrementAndGet(TubeSink.KEY_SINK_SUCCESS);
                return;
            }
            TubeSink.this.monitorIndex.addAndGet(append.toString(), 0, 0, 0L, parseInt);
            TubeSink.this.monitorIndexExt.incrementAndGet(TubeSink.KEY_SINK_FAILURE);
            if (z2) {
                TubeSink.this.monitorIndexExt.incrementAndGet(TubeSink.KEY_SINK_EXP);
            }
        }
    }

    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/TubeSink$TubeSinkTask.class */
    private class TubeSinkTask implements Runnable {
        public TubeSinkTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String message;
            Event event;
            EventStat eventStat = null;
            boolean z = false;
            TubeSink.logger.info("sink task {} started.", Thread.currentThread().getName());
            while (TubeSink.this.canSend) {
                try {
                } catch (InterruptedException e) {
                    TubeSink.logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                    return;
                } catch (Throwable th) {
                    TubeSink.this.resendEvent(eventStat, z, DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, th.getMessage());
                    if ((th instanceof TubeClientException) && (message = th.getMessage()) != null && (message.contains("No available queue for topic") || message.contains("The brokers of topic are all forbidden"))) {
                        TubeSink.this.isOverFlow = true;
                    }
                    if (TubeSink.LOG_SINK_TASK_PRINTER.shouldPrint()) {
                        TubeSink.logger.error("Sink task fail to send the message, finished = {}, sink.name = {},event.headers= {}", new Object[]{Boolean.valueOf(z), Thread.currentThread().getName(), eventStat.getEvent().getHeaders(), th});
                    }
                }
                if (!TubeSink.this.started.get() && TubeSink.this.cachedMsgCnt.get() <= 0) {
                    TubeSink.logger.info("Found started is false and taken message count is zero, braek!");
                    break;
                }
                if (TubeSink.this.isOverFlow) {
                    TubeSink.this.isOverFlow = false;
                    Thread.sleep(30L);
                }
                if (TubeSink.this.resendQueue.isEmpty()) {
                    event = (Event) TubeSink.this.eventQueue.poll(DispatchManager.DEFAULT_DISPATCH_TIMEOUT, TimeUnit.MILLISECONDS);
                    if (event != null) {
                        TubeSink.this.cachedMsgCnt.decrementAndGet();
                        TubeSink.this.takenMsgCnt.incrementAndGet();
                        eventStat = new EventStat(event);
                    }
                } else {
                    eventStat = (EventStat) TubeSink.this.resendQueue.poll();
                    if (eventStat != null) {
                        TubeSink.this.resendMsgCnt.decrementAndGet();
                        event = eventStat.getEvent();
                    }
                }
                String str = (String) event.getHeaders().get("topic");
                if (StringUtils.isBlank(str)) {
                    TubeSink.this.blankTopicDiscardMsgCnt.incrementAndGet();
                    TubeSink.this.takenMsgCnt.decrementAndGet();
                    if (TubeSink.this.statIntervalSec > 0) {
                        TubeSink.this.monitorIndexExt.incrementAndGet(TubeSink.KEY_SINK_DROPPED);
                    }
                    if (TubeSink.LOG_SINK_TASK_PRINTER.shouldPrint()) {
                        TubeSink.logger.error("No topic specified, just discard the event, event header is " + event.getHeaders().toString());
                    }
                } else {
                    z = sendMessage(eventStat, str);
                }
            }
            TubeSink.logger.info("sink task {} stopped!", Thread.currentThread().getName());
        }

        private boolean sendMessage(EventStat eventStat, String str) throws Exception {
            Event event = eventStat.getEvent();
            MessageProducer producer = TubeSink.this.producerHolder.getProducer(str);
            if (producer == null) {
                TubeSink.this.frozenTopicDiscardMsgCnt.incrementAndGet();
                TubeSink.this.takenMsgCnt.decrementAndGet();
                if (TubeSink.this.statIntervalSec > 0) {
                    TubeSink.this.monitorIndexExt.incrementAndGet(TubeSink.KEY_SINK_DROPPED);
                }
                String str2 = "Get producer failed for " + str;
                if (MessageUtils.isSinkRspType(event)) {
                    MessageUtils.sinkReturnRspPackage((SinkRspEvent) event, DataProxyErrCode.PRODUCER_IS_NULL, str2);
                }
                if (!TubeSink.LOG_SINK_TASK_PRINTER.shouldPrint()) {
                    return false;
                }
                TubeSink.logger.error(str2);
                return false;
            }
            if (!TubeSink.MSG_DEDUP_HANDLER.judgeDupAndPutMsgSeqId((String) event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) {
                producer.sendMessage(TubeUtils.buildMessage(str, event), new MyCallback(eventStat));
                TubeSink.this.inflightMsgCnt.incrementAndGet();
                return true;
            }
            TubeSink.this.dupDiscardMsgCnt.incrementAndGet();
            TubeSink.this.takenMsgCnt.decrementAndGet();
            if (TubeSink.this.statIntervalSec > 0) {
                TubeSink.this.monitorIndexExt.incrementAndGet(TubeSink.KEY_SINK_DROPPED);
            }
            String str3 = "Duplicated message discard, by uuid = " + ((String) event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
            if (MessageUtils.isSinkRspType(event)) {
                MessageUtils.sinkReturnRspPackage((SinkRspEvent) event, DataProxyErrCode.DUPLICATED_MESSAGE, str3);
            }
            TubeSink.logger.info("{} agent package {} existed,just discard.", Thread.currentThread().getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID));
            return false;
        }
    }

    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/TubeSink$TubeStatsTask.class */
    private class TubeStatsTask implements Runnable {
        private TubeStatsTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (TubeSink.this.started.get()) {
                TubeSink.logger.info(TubeSink.this.getName() + "[TubeSink Stats] cachedMsgCnt=" + TubeSink.this.cachedMsgCnt.get() + ", takenMsgCnt=" + TubeSink.this.takenMsgCnt.get() + ", resendMsgCnt=" + TubeSink.this.resendMsgCnt.get() + ", blankTopicDiscardMsgCnt=" + TubeSink.this.blankTopicDiscardMsgCnt.get() + ", frozenTopicDiscardMsgCnt=" + TubeSink.this.frozenTopicDiscardMsgCnt.get() + ", dupDiscardMsgCnt=" + TubeSink.this.dupDiscardMsgCnt.get() + ", inflightMsgCnt=" + TubeSink.this.inflightMsgCnt.get() + ", successMsgCnt=" + TubeSink.this.successMsgCnt.get());
            }
        }
    }

    public TubeSink() {
        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new TubeStatsTask(), 30L, 60L, TimeUnit.SECONDS);
        logger.info("success to start performance statistic task!");
    }

    public void configure(Context context) {
        logger.info(getName() + " configure from context: {}", context);
        this.configManager = ConfigManager.getInstance();
        this.tubeConfig = this.configManager.getMqClusterConfig();
        this.topicProperties = this.configManager.getTopicProperties();
        this.masterHostAndPortLists = this.configManager.getMqClusterUrl2Token().keySet();
        this.usedMasterAddr = getFirstClusterAddr(this.masterHostAndPortLists);
        if (this.usedMasterAddr != null) {
            this.producerHolder = new TubeProducerHolder(getName(), this.usedMasterAddr, this.configManager.getMqClusterConfig());
        }
        MSG_DEDUP_HANDLER.start(this.tubeConfig.getClientIdCache(), this.tubeConfig.getMaxSurvivedTime(), this.tubeConfig.getMaxSurvivedSize());
        this.maxMonitorCnt = context.getInteger(ConfigConstants.MAX_MONITOR_CNT, 300000).intValue();
        this.statIntervalSec = this.tubeConfig.getStatIntervalSec();
        Preconditions.checkArgument(this.statIntervalSec >= 0, "statIntervalSec must be >= 0");
        int badEventQueueSize = this.tubeConfig.getBadEventQueueSize();
        Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0");
        this.resendQueue = new LinkedBlockingQueue<>(badEventQueueSize);
        int threadNum = this.tubeConfig.getThreadNum();
        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
        this.sinkThreadPool = new Thread[threadNum];
        int eventQueueSize = this.tubeConfig.getEventQueueSize();
        Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
        this.eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
        if (this.tubeConfig.getDiskIoRatePerSec() != 0) {
            this.diskRateLimiter = RateLimiter.create(this.tubeConfig.getDiskIoRatePerSec());
        }
        this.configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() { // from class: org.apache.inlong.dataproxy.sink.TubeSink.1
            @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
            public void update() {
                TubeSink.this.diffSetPublish(new HashSet(TubeSink.this.topicProperties.values()), new HashSet(TubeSink.this.configManager.getTopicProperties().values()));
            }
        });
        this.configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() { // from class: org.apache.inlong.dataproxy.sink.TubeSink.2
            @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
            public void update() {
                TubeSink.this.diffUpdateTubeClient(TubeSink.this.masterHostAndPortLists, TubeSink.this.configManager.getMqClusterUrl2Token().keySet());
            }
        });
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            logger.info("Duplicated call, " + getName() + " has started!");
            return;
        }
        if (this.statIntervalSec > 0) {
            this.monitorIndex = new MonitorIndex("Tube_Sink", this.statIntervalSec, this.maxMonitorCnt);
            this.monitorIndexExt = new MonitorIndexExt("Tube_Sink_monitors#" + getName(), this.statIntervalSec, this.maxMonitorCnt);
        }
        this.metricItemSet = new DataProxyMetricItemSet(ConfigManager.getInstance().getCommonProperties().getOrDefault("proxy.cluster.name", ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME), getName());
        MetricRegister.register(this.metricItemSet);
        if (this.producerHolder != null) {
            try {
                this.producerHolder.start(new HashSet(this.topicProperties.values()));
                ConfigManager.getInstance().updMqClusterStatus(true);
                logger.info("[{}] MQ Cluster service status ready!", getName());
            } catch (FlumeException e) {
                logger.error("Unable to start TubeMQ client. Exception follows.", e);
                super.stop();
                return;
            }
        }
        super.start();
        this.canSend = true;
        for (int i = 0; i < this.sinkThreadPool.length; i++) {
            this.sinkThreadPool[i] = new Thread(new TubeSinkTask(), getName() + "_tube_sink_sender-" + i);
            this.sinkThreadPool[i].start();
        }
        logger.info(getName() + " started!");
    }

    public void stop() {
        if (!this.started.compareAndSet(true, false)) {
            logger.info("Duplicated call, " + getName() + " has stopped!");
            return;
        }
        int i = 0;
        while (this.takenMsgCnt.get() > 0) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.info("Stop thread has been interrupt!");
            }
        }
        if (this.sinkThreadPool != null) {
            for (Thread thread : this.sinkThreadPool) {
                if (thread != null) {
                    thread.interrupt();
                }
            }
        }
        if (this.producerHolder != null) {
            this.producerHolder.stop();
        }
        SCHEDULED_EXECUTOR_SERVICE.shutdown();
        if (this.statIntervalSec > 0) {
            try {
                this.monitorIndex.shutDown();
            } catch (Exception e2) {
                logger.warn("Stats runner interrupted");
            }
        }
        super.stop();
        logger.info(getName() + " stopped!");
    }

    public Sink.Status process() throws EventDeliveryException {
        if (!this.started.get()) {
            return Sink.Status.BACKOFF;
        }
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take != null) {
                    if (this.diskRateLimiter != null) {
                        this.diskRateLimiter.acquire(take.getBody().length);
                    }
                    if (this.eventQueue.offer(take, 3000L, TimeUnit.MILLISECONDS)) {
                        transaction.commit();
                        this.cachedMsgCnt.incrementAndGet();
                    } else {
                        transaction.rollback();
                    }
                } else {
                    status = Sink.Status.BACKOFF;
                    transaction.commit();
                }
                transaction.close();
            } catch (Throwable th) {
                logger.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    logger.error("meta sink transaction rollback exception", th2);
                }
                transaction.close();
            }
            return status;
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resendEvent(EventStat eventStat, boolean z, DataProxyErrCode dataProxyErrCode, String str) {
        if (z) {
            try {
                this.inflightMsgCnt.decrementAndGet();
            } catch (Throwable th) {
                this.takenMsgCnt.decrementAndGet();
                if (this.statIntervalSec > 0) {
                    this.monitorIndexExt.incrementAndGet(KEY_SINK_DROPPED);
                }
                if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
                    logger.error(getName() + " Discard msg because put events to both of queue and fileChannel fail,current resendQueue.size = " + this.resendQueue.size(), th);
                    return;
                }
                return;
            }
        }
        if (eventStat == null || eventStat.getEvent() == null) {
            this.takenMsgCnt.decrementAndGet();
            return;
        }
        MSG_DEDUP_HANDLER.invalidMsgSeqId((String) eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID));
        if (MessageUtils.isSinkRspType(eventStat.getEvent())) {
            MessageUtils.sinkReturnRspPackage((SinkRspEvent) eventStat.getEvent(), dataProxyErrCode, str);
        } else if (this.resendQueue.offer(eventStat)) {
            this.resendMsgCnt.incrementAndGet();
        } else {
            FailoverChannelProcessorHolder.getChannelProcessor().processEvent(eventStat.getEvent());
            this.takenMsgCnt.decrementAndGet();
            if (LOG_SINK_TASK_PRINTER.shouldPrint()) {
                logger.error(Thread.currentThread().getName() + " Channel --> Tube --> ResendQueue(full) -->FailOverChannelProcessor(current code point), Resend queue is full,Check if Tube server or network is ok.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void diffSetPublish(Set<String> set, Set<String> set2) {
        if (!this.started.get()) {
            logger.info(getName() + " not started, ignore this change!");
        }
        if (SetUtils.isEqualSet(set, set2)) {
            return;
        }
        HashSet hashSet = new HashSet();
        for (String str : set2) {
            if (!StringUtils.isBlank(str) && !set.contains(str)) {
                hashSet.add(str);
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        if (this.producerHolder != null) {
            try {
                this.producerHolder.createProducersByTopicSet(hashSet);
            } catch (Throwable th) {
                logger.info(getName() + "'s publish new topic set fail.", th);
            }
        }
        logger.info(getName() + "'s topics set has changed, trigger diff publish for {}", hashSet);
        this.topicProperties = this.configManager.getTopicProperties();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void diffUpdateTubeClient(Set<String> set, Set<String> set2) {
        String firstClusterAddr;
        if (!this.started.get()) {
            logger.info(getName() + " not started, ignore this change!");
        }
        if (set2 == null || set2.isEmpty() || SetUtils.isEqualSet(set, set2) || set2.contains(this.usedMasterAddr) || (firstClusterAddr = getFirstClusterAddr(set2)) == null) {
            return;
        }
        TubeProducerHolder tubeProducerHolder = new TubeProducerHolder(getName(), firstClusterAddr, this.configManager.getMqClusterConfig());
        try {
            tubeProducerHolder.start(new HashSet(this.configManager.getTopicProperties().values()));
            String str = this.usedMasterAddr;
            TubeProducerHolder tubeProducerHolder2 = this.producerHolder;
            this.producerHolder = tubeProducerHolder;
            this.usedMasterAddr = firstClusterAddr;
            if (tubeProducerHolder2 == null) {
                diffSetPublish(new HashSet(), new HashSet(this.configManager.getTopicProperties().values()));
            } else {
                tubeProducerHolder2.stop();
            }
            if (!ConfigManager.getInstance().isMqClusterReady()) {
                ConfigManager.getInstance().updMqClusterStatus(true);
                logger.info("[{}] MQ Cluster service status ready!", getName());
            }
            logger.info(getName() + " switch cluster from " + str + " to " + this.usedMasterAddr);
        } catch (Throwable th) {
            logger.error(getName() + " create new producer holder for " + firstClusterAddr + " failure, throw exception is  {}", th.getMessage());
        }
    }

    private String getFirstClusterAddr(Set<String> set) {
        String str = null;
        Iterator<String> it = set.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (!StringUtils.isBlank(next)) {
                str = next;
                break;
            }
        }
        return str;
    }
}
