package org.apache.inlong.audit.service;

import com.google.gson.Gson;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.inlong.audit.config.JdbcConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.file.RemoteConfigJson;
import org.apache.inlong.audit.service.consume.BaseConsume;
import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/AuditMsgConsumerServer.class */
public class AuditMsgConsumerServer implements InitializingBean {
    private static final Logger LOG = LoggerFactory.getLogger(AuditMsgConsumerServer.class);

    @Autowired
    private MessageQueueConfig mqConfig;

    @Autowired
    private StoreConfig storeConfig;

    @Autowired
    private JdbcConfig jdbcConfig;
    private JdbcService jdbcService;
    private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties";
    private static final int INTERVAL_MS = 5000;
    private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
    private final Gson gson = new Gson();

    public void afterPropertiesSet() {
        List<MQInfo> clusterFromManager = getClusterFromManager();
        BaseConsume baseConsume = null;
        List<InsertData> insertServiceList = getInsertServiceList();
        Iterator<MQInfo> it = clusterFromManager.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            MQInfo next = it.next();
            if (!this.mqConfig.isPulsar() || !"PULSAR".equals(next.getMqType())) {
                if (!this.mqConfig.isTube() || !"TUBEMQ".equals(next.getMqType())) {
                    if (this.mqConfig.isKafka() && "KAFKA".equals(next.getMqType())) {
                        this.mqConfig.setKafkaServerUrl(next.getUrl());
                        baseConsume = new KafkaConsume(insertServiceList, this.storeConfig, this.mqConfig);
                        break;
                    }
                } else {
                    this.mqConfig.setTubeMasterList(next.getUrl());
                    baseConsume = new TubeConsume(insertServiceList, this.storeConfig, this.mqConfig);
                    break;
                }
            } else {
                this.mqConfig.setPulsarServerUrl(next.getUrl());
                baseConsume = new PulsarConsume(insertServiceList, this.storeConfig, this.mqConfig);
                break;
            }
        }
        if (baseConsume == null) {
            LOG.error("Unknown MessageQueue {}", this.mqConfig.getMqType());
        }
        if (this.storeConfig.isJdbc()) {
            this.jdbcService.start();
        }
        baseConsume.start();
    }

    private List<InsertData> getInsertServiceList() {
        ArrayList arrayList = new ArrayList();
        if (this.storeConfig.isJdbc()) {
            this.jdbcService = new JdbcService(this.jdbcConfig);
            arrayList.add(this.jdbcService);
        }
        return arrayList;
    }

    private List<MQInfo> getClusterFromManager() {
        List<MQInfo> mQConfig;
        Properties properties = new Properties();
        try {
            InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES);
            Throwable th = null;
            try {
                try {
                    properties.load(resourceAsStream);
                    String property = properties.getProperty("manager.hosts");
                    String property2 = properties.getProperty("default.mq.cluster.tag");
                    String[] split = StringUtils.split(property, ",");
                    if (0 >= split.length) {
                        if (resourceAsStream != null) {
                            if (0 != 0) {
                                try {
                                    resourceAsStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                resourceAsStream.close();
                            }
                        }
                        return null;
                    }
                    String str = split[0];
                    while (true) {
                        mQConfig = getMQConfig(str, property2);
                        if (ObjectUtils.isNotEmpty(mQConfig)) {
                            break;
                        }
                        LOG.info("MQ config may not be registered yet, wait for 5s and try again");
                        Thread.sleep(5000L);
                    }
                    if (resourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                resourceAsStream.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            resourceAsStream.close();
                        }
                    }
                    return mQConfig;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        throw new RuntimeException(e);
    }

    private List<MQInfo> getMQConfig(String str, String str2) {
        List<MQInfo> mqInfoList;
        HttpPost httpPost = null;
        try {
            try {
                String str3 = "http://" + str + "/inlong/manager/openapi/audit/getConfig";
                LOG.info("start to request {} to get config info", str3);
                httpPost = new HttpPost(str3);
                httpPost.addHeader("Connection", "close");
                AuditConfigRequest auditConfigRequest = new AuditConfigRequest();
                auditConfigRequest.setClusterTag(str2);
                StringEntity stringEntity = new StringEntity(this.gson.toJson(auditConfigRequest));
                stringEntity.setContentType("application/json");
                httpPost.setEntity(stringEntity);
                LOG.info("start to request {} to get config info with params {}", str3, auditConfigRequest);
                RemoteConfigJson remoteConfigJson = (RemoteConfigJson) this.gson.fromJson(EntityUtils.toString(this.httpClient.execute(httpPost).getEntity()), RemoteConfigJson.class);
                if (remoteConfigJson.isSuccess() && remoteConfigJson.getData() != null && (mqInfoList = remoteConfigJson.getData().getMqInfoList()) != null) {
                    if (!mqInfoList.isEmpty()) {
                        if (httpPost != null) {
                            httpPost.releaseConnection();
                        }
                        return mqInfoList;
                    }
                }
                if (httpPost == null) {
                    return null;
                }
                httpPost.releaseConnection();
                return null;
            } catch (Exception e) {
                LOG.error("Failed to get MQ config from manager, please check it", e);
                if (httpPost != null) {
                    httpPost.releaseConnection();
                }
                return null;
            }
        } catch (Throwable th) {
            if (httpPost != null) {
                httpPost.releaseConnection();
            }
            throw th;
        }
    }
}
