package org.apache.inlong.dataproxy.sink.common;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.FlumeException;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.class */
public class TubeProducerHolder {
    private static final long SEND_FAILURE_WAIT = 30000;
    private static final long PUBLISH_FAILURE_WAIT = 60000;
    private final String sinkName;
    private final String clusterAddr;
    private final MQClusterConfig clusterConfig;
    private static final Logger logger = LoggerFactory.getLogger(TubeProducerHolder.class);
    private static final ConcurrentHashMap<String, AtomicLong> FROZEN_TOPIC_MAP = new ConcurrentHashMap<>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private TubeMultiSessionFactory sessionFactory = null;
    private final Map<String, MessageProducer> producerMap = new ConcurrentHashMap();
    private MessageProducer lastProducer = null;
    private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0);

    public TubeProducerHolder(String str, String str2, MQClusterConfig mQClusterConfig) {
        Preconditions.checkState(StringUtils.isNotBlank(str2), "No TubeMQ's cluster address list specified");
        this.sinkName = str;
        this.clusterAddr = str2;
        this.clusterConfig = mQClusterConfig;
    }

    public void start(Set<String> set) {
        if (!this.started.compareAndSet(false, true)) {
            logger.info("ProducerHolder for " + this.sinkName + " has started!");
            return;
        }
        logger.info("ProducerHolder for " + this.sinkName + " begin to start!");
        try {
            this.sessionFactory = new TubeMultiSessionFactory(TubeUtils.buildClientConfig(this.clusterAddr, this.clusterConfig));
            createProducersByTopicSet(set);
            logger.info("ProducerHolder for " + this.sinkName + " started!");
        } catch (Throwable th) {
            stop();
            String str = "Build session factory  to " + this.clusterAddr + " for " + this.sinkName + " failure, please re-check";
            logger.error(str, th);
            throw new FlumeException(str);
        }
    }

    public void stop() {
        if (this.started.get()) {
            return;
        }
        if (!this.started.compareAndSet(true, false)) {
            logger.info("ProducerHolder for " + this.sinkName + " has stopped!");
            return;
        }
        logger.info("ProducerHolder for " + this.sinkName + " begin to stop!");
        for (Map.Entry<String, MessageProducer> entry : this.producerMap.entrySet()) {
            if (entry != null && entry.getValue() != null) {
                try {
                    entry.getValue().shutdown();
                } catch (Throwable th) {
                }
            }
        }
        this.producerMap.clear();
        this.lastProducer = null;
        this.lastPubTopicCnt.set(0);
        FROZEN_TOPIC_MAP.clear();
        if (this.sessionFactory != null) {
            try {
                this.sessionFactory.shutdown();
            } catch (Throwable th2) {
            }
            this.sessionFactory = null;
        }
        logger.info("ProducerHolder for " + this.sinkName + " finished stop!");
    }

    public MessageProducer getProducer(String str) throws TubeClientException {
        AtomicLong atomicLong = FROZEN_TOPIC_MAP.get(str);
        if (atomicLong != null && atomicLong.get() > System.currentTimeMillis()) {
            return null;
        }
        MessageProducer messageProducer = this.producerMap.get(str);
        if (messageProducer != null) {
            if (atomicLong != null) {
                FROZEN_TOPIC_MAP.remove(str);
            }
            return messageProducer;
        }
        synchronized (this.lastPubTopicCnt) {
            AtomicLong atomicLong2 = FROZEN_TOPIC_MAP.get(str);
            if (atomicLong2 != null && atomicLong2.get() > System.currentTimeMillis()) {
                return null;
            }
            if (this.lastProducer == null || this.lastPubTopicCnt.get() >= this.clusterConfig.getMaxTopicsEachProducerHold()) {
                this.lastProducer = this.sessionFactory.createProducer();
                this.lastPubTopicCnt.set(0);
            }
            try {
                this.lastProducer.publish(str);
                this.producerMap.put(str, this.lastProducer);
                this.lastPubTopicCnt.incrementAndGet();
                return this.lastProducer;
            } catch (Throwable th) {
                AtomicLong atomicLong3 = FROZEN_TOPIC_MAP.get(str);
                if (atomicLong3 == null) {
                    AtomicLong atomicLong4 = new AtomicLong();
                    atomicLong3 = FROZEN_TOPIC_MAP.putIfAbsent(str, atomicLong4);
                    if (atomicLong3 == null) {
                        atomicLong3 = atomicLong4;
                    }
                }
                atomicLong3.set(System.currentTimeMillis() + 60000);
                logger.warn("Throw exception while publish topic=" + str + ", exception is " + th.getMessage());
                return null;
            }
        }
    }

    public boolean needFrozenSent(String str, Throwable th) {
        String message;
        if (!(th instanceof TubeClientException) || (message = th.getMessage()) == null) {
            return false;
        }
        if (!message.contains("No available partition for topic") && !message.contains("The brokers of topic are all forbidden")) {
            return false;
        }
        AtomicLong atomicLong = FROZEN_TOPIC_MAP.get(str);
        if (atomicLong == null) {
            AtomicLong atomicLong2 = new AtomicLong(0L);
            atomicLong = FROZEN_TOPIC_MAP.putIfAbsent(str, atomicLong2);
            if (atomicLong == null) {
                atomicLong = atomicLong2;
            }
        }
        atomicLong.set(System.currentTimeMillis() + SEND_FAILURE_WAIT);
        return true;
    }

    public synchronized void createProducersByTopicSet(Set<String> set) throws Exception {
        if (set == null || set.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList(set.size());
        for (String str : set) {
            if (!StringUtils.isBlank(str) && this.producerMap.get(str) == null) {
                arrayList.add(str);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        Collections.sort(arrayList);
        long currentTimeMillis = System.currentTimeMillis();
        int maxTopicsEachProducerHold = this.clusterConfig.getMaxTopicsEachProducerHold();
        int size = arrayList.size();
        ArrayList<Integer> arrayList2 = new ArrayList();
        int i = this.lastPubTopicCnt.get() <= 0 ? 0 : maxTopicsEachProducerHold - this.lastPubTopicCnt.get();
        while (size > 0) {
            if (i > 0) {
                arrayList2.add(Integer.valueOf(Math.min(size, i)));
                size -= i;
                i = 0;
            } else {
                arrayList2.add(Integer.valueOf(Math.min(size, maxTopicsEachProducerHold)));
                size -= maxTopicsEachProducerHold;
            }
        }
        int i2 = 0;
        HashSet hashSet = new HashSet();
        for (Integer num : arrayList2) {
            hashSet.clear();
            int intValue = i2 + num.intValue();
            for (int i3 = i2; i3 < intValue; i3++) {
                hashSet.add((String) arrayList.get(i3));
            }
            i2 = intValue;
            if (this.lastProducer == null || this.lastPubTopicCnt.get() == maxTopicsEachProducerHold) {
                this.lastProducer = this.sessionFactory.createProducer();
                this.lastPubTopicCnt.set(0);
            }
            try {
                this.lastProducer.publish(hashSet);
            } catch (Throwable th) {
                logger.info(this.sinkName + " meta sink publish fail.", th);
            }
            this.lastPubTopicCnt.addAndGet(hashSet.size());
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                this.producerMap.put((String) it.next(), this.lastProducer);
            }
        }
        logger.info(this.sinkName + " initializes producers for topics:" + this.producerMap.keySet() + ", cost: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
    }
}
