package org.apache.inlong.tubemq.manager.service;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.tubemq.manager.repository.TopicRepository;
import org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/inlong/tubemq/manager/service/TopicBackendWorker.class */
public class TopicBackendWorker implements DisposableBean, Runnable {
    private static final Logger log = LoggerFactory.getLogger(TopicBackendWorker.class);
    private final NodeService nodeService;

    @Autowired
    private TopicRepository topicRepository;

    @Value("${manager.topic.queue.warning.size:100}")
    private int queueWarningSize;

    @Value("${manager.topic.queue.thread.interval:10}")
    private int queueThreadInterval;

    @Value("${manager.topic.queue.max.wait:3}")
    private int queueMaxWait;

    @Value("${manager.topic.queue.max.running.size:20}")
    private int queueMaxRunningSize;
    private final AtomicBoolean runFlag = new AtomicBoolean(false);
    private final ConcurrentHashMap<Integer, BlockingQueue<TopicFuture>> pendingTopics = new ConcurrentHashMap<>();
    private final AtomicInteger notSatisfiedCount = new AtomicInteger(0);
    private final ScheduledExecutorService workerExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("tubemq-manager-topic-backend-worker").build());

    TopicBackendWorker() {
        this.workerExecutor.schedule(this, this.queueThreadInterval, TimeUnit.SECONDS);
        this.nodeService = new NodeServiceImpl(this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.util.concurrent.BlockingQueue] */
    public void addTopicFuture(TopicFuture topicFuture) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = (BlockingQueue) this.pendingTopics.putIfAbsent(Integer.valueOf(topicFuture.getEntry().getClusterId()), linkedBlockingQueue);
        if (linkedBlockingQueue2 == null) {
            linkedBlockingQueue2 = linkedBlockingQueue;
        }
        linkedBlockingQueue2.add(topicFuture);
        if (linkedBlockingQueue2.size() > this.queueWarningSize) {
            log.warn("queue size exceed {}, please check it", Integer.valueOf(this.queueWarningSize));
        }
    }

    private void batchAddTopic() {
        this.pendingTopics.forEach((num, blockingQueue) -> {
            HashMap hashMap = new HashMap(32);
            if (this.notSatisfiedCount.get() > this.queueMaxWait || blockingQueue.size() > this.queueMaxRunningSize) {
                this.notSatisfiedCount.set(0);
                ArrayList<TopicFuture> arrayList = new ArrayList();
                blockingQueue.drainTo(arrayList, this.queueMaxRunningSize);
                for (TopicFuture topicFuture : arrayList) {
                    hashMap.put(topicFuture.getEntry().getTopic(), topicFuture);
                }
            } else {
                this.notSatisfiedCount.incrementAndGet();
            }
            this.nodeService.updateBrokerStatus(num.intValue(), hashMap);
        });
    }

    private void checkTopicFromDB() {
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("TopicBackendWorker has started");
        if (this.runFlag.get()) {
            try {
                batchAddTopic();
                checkTopicFromDB();
            } catch (Exception e) {
                log.warn("exception caught", e);
            }
        }
    }

    public void destroy() throws Exception {
        this.runFlag.set(false);
        this.nodeService.close();
        this.workerExecutor.shutdown();
    }
}
