package ml.zhangxujie.konfig;

import cn.hutool.core.lang.Singleton;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSONObject;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import ml.zhangxujie.konfig.common.ConfigHelper;
import ml.zhangxujie.konfig.common.Const;
import ml.zhangxujie.konfig.common.KonfigUtil;
import ml.zhangxujie.konfig.dto.KonfigDataStatus;
import ml.zhangxujie.konfig.dto.MqPollData;
import ml.zhangxujie.konfig.dto.konfig.Konfig;
import ml.zhangxujie.konfig.dto.konfig.KonfigCollection;
import ml.zhangxujie.konfig.event.KonfigEventListener;
import ml.zhangxujie.konfig.event.KonfigEventObject;
import ml.zhangxujie.konfig.event.KonfigEventObjectSource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/zhangxujie/konfig/KonfigClient.class */
public class KonfigClient {
    private static final Logger log = LoggerFactory.getLogger(KonfigClient.class);
    private String url;
    private Properties props = new Properties();
    private static volatile KonfigClient konfigClient;

    private KonfigClient(String str) {
        String conf = ConfigHelper.getConf("server.host", "127.0.0.1");
        String conf2 = ConfigHelper.getConf("server.port", "8301");
        this.props.put("bootstrap.servers", ConfigHelper.getConf("kafka.consumer.bootstrap.servers", ""));
        this.props.put("auto.commit.interval.ms", ConfigHelper.getConf("kafka.consumer.auto.commit.interval.ms", ""));
        this.props.put("group.id", str);
        this.props.put("enable.auto.commit", ConfigHelper.getConf("kafka.consumer.enable.auto.commit", ""));
        this.props.put("session.timeout.ms", ConfigHelper.getConf("kafka.consumer.session.timeout.ms", ""));
        this.props.put("key.deserializer", ConfigHelper.getConf("kafka.consumer.key.deserializer", ""));
        this.props.put("value.deserializer", ConfigHelper.getConf("kafka.consumer.value.deserializer", ""));
        this.url = conf + ":" + conf2;
    }

    public static KonfigClient getKonfigClient(String str) {
        if (konfigClient == null) {
            synchronized (Singleton.class) {
                if (konfigClient == null) {
                    konfigClient = new KonfigClient(str);
                }
            }
        }
        return konfigClient;
    }

    public KonfigCollection getConfig(Integer num) {
        KonfigCollection konfigCollection = null;
        try {
            konfigCollection = (KonfigCollection) JSONObject.parseObject(HttpRequest.get(this.url + Const.API_PATH_CONFIG + "/" + num).execute().body(), KonfigCollection.class);
        } catch (Exception e) {
            log.error("failed to parse json konfig");
        }
        return konfigCollection == null ? new KonfigCollection() : konfigCollection;
    }

    public Map<String, Konfig> convertKonfigListToMap(List<Konfig> list) {
        return KonfigUtil.parseKonfigMap(list);
    }

    public void addEventListener(Integer num, KonfigEventListener konfigEventListener) {
        log.info("Add event listener: collectionId={}", num);
        new Thread(() -> {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.props);
            kafkaConsumer.subscribe(Arrays.asList(Const.MQ_TOPIC_CONFIG + num));
            while (true) {
                Iterator it = kafkaConsumer.poll(Duration.ofMillis(100L)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    log.info("消费者消费的数据为：{}", consumerRecord.value());
                    MqPollData mqPollData = new MqPollData();
                    try {
                        mqPollData = (MqPollData) JSONObject.parseObject((String) consumerRecord.value(), MqPollData.class);
                    } catch (Exception e) {
                        log.error("failed to parse json from kafka");
                    }
                    KonfigCollection config = getConfig(num);
                    KonfigEventObjectSource konfigEventObjectSource = new KonfigEventObjectSource();
                    if (mqPollData.getStatus().equals(Const.MQ_DATA_STATUS_ONLINE)) {
                        konfigEventObjectSource.setKonfigDataStatus(KonfigDataStatus.ONLINE);
                    } else {
                        konfigEventObjectSource.setKonfigDataStatus(KonfigDataStatus.DRAFT);
                    }
                    konfigEventObjectSource.setTimestamp(mqPollData.getTimestamp());
                    konfigEventObjectSource.setKonfigCollection(config);
                    konfigEventObjectSource.setConfigMap(KonfigUtil.parseKonfigMap(config.getConfigList()));
                    konfigEventListener.OnChanged(new KonfigEventObject(konfigEventObjectSource));
                }
            }
        }).start();
    }
}
