package org.apache.inlong.audit.service.consume;

import com.google.common.base.Preconditions;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.service.InsertData;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/TubeConsume.class */
public class TubeConsume extends BaseConsume {
    private static final Logger LOG = LoggerFactory.getLogger(TubeConsume.class);
    private PullMessageConsumer pullConsumer;
    private TubeMultiSessionFactory sessionFactory;
    private String masterUrl;
    private String topic;
    private int fetchThreadCnt;

    /* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/TubeConsume$Fetcher.class */
    public class Fetcher implements Runnable {
        private final PullMessageConsumer pullMessageConsumer;
        private String topic;

        public Fetcher(PullMessageConsumer pullMessageConsumer, String str) {
            this.pullMessageConsumer = pullMessageConsumer;
            this.topic = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.pullMessageConsumer.isPartitionsReady(5000L) && !this.pullMessageConsumer.isShutdown()) {
            }
            TubeConsume.LOG.warn("tube partition is not ready or consumer is shutdown!");
            while (!this.pullMessageConsumer.isShutdown()) {
                try {
                    ConsumerResult message = this.pullMessageConsumer.getMessage();
                    if (message.isSuccess()) {
                        List<Message> messageList = message.getMessageList();
                        if (CollectionUtils.isNotEmpty(messageList)) {
                            for (Message message2 : messageList) {
                                if (StringUtils.equals(message2.getTopic(), this.topic)) {
                                    TubeConsume.this.handleMessage(new String(message2.getData(), StandardCharsets.UTF_8));
                                }
                            }
                        }
                        this.pullMessageConsumer.confirmConsume(message.getConfirmContext(), true);
                    } else {
                        TubeConsume.LOG.error("receive messages errorCode is {}, error meddage is {}", Integer.valueOf(message.getErrCode()), message.getErrMsg());
                    }
                } catch (Exception e) {
                    TubeConsume.LOG.error("handle audit message error {}", e.getMessage());
                } catch (TubeClientException e2) {
                    TubeConsume.LOG.error("tube consumer getMessage error {}", e2.getMessage());
                }
            }
            TubeConsume.LOG.warn("consumer is shutdown!");
        }
    }

    public TubeConsume(List<InsertData> list, StoreConfig storeConfig, MessageQueueConfig messageQueueConfig) {
        super(list, storeConfig, messageQueueConfig);
        this.fetchThreadCnt = 4;
    }

    @Override // org.apache.inlong.audit.service.consume.BaseConsume
    public void start() {
        this.masterUrl = this.mqConfig.getTubeMasterList();
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.masterUrl), "no tube masterUrlList specified");
        this.topic = this.mqConfig.getTubeTopic();
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.topic), "no tube topic specified");
        this.fetchThreadCnt = this.mqConfig.getTubeThreadNum();
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getTubeConsumerGroupName()), "no tube consumer groupName specified");
        initConsumer();
        Thread[] threadArr = new Thread[this.fetchThreadCnt];
        for (int i = 0; i < this.fetchThreadCnt; i++) {
            threadArr[i] = new Thread(new Fetcher(this.pullConsumer, this.topic), "TubeConsume_Fetcher_Thread_" + i);
            threadArr[i].start();
        }
    }

    private void initConsumer() {
        LOG.info("init tube consumer, topic:{}, masterList:{}", this.topic, this.masterUrl);
        ConsumerConfig consumerConfig = new ConsumerConfig(this.masterUrl, this.mqConfig.getTubeConsumerGroupName());
        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
        try {
            this.sessionFactory = new TubeMultiSessionFactory(consumerConfig);
            this.pullConsumer = this.sessionFactory.createPullConsumer(consumerConfig);
            this.pullConsumer.subscribe(this.topic, (TreeSet) null);
            this.pullConsumer.completeSubscribe();
        } catch (TubeClientException e) {
            LOG.error("init tube consumer error {}", e.getMessage());
        }
    }
}
