/*
 * Decompiled with CFR 0.152.
 */
package cn.hippo4j.starter.core;

import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.starter.core.CacheData;
import cn.hippo4j.starter.core.Listener;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

public class ClientWorker {
    private static final Logger log = LoggerFactory.getLogger(ClientWorker.class);
    private double currentLongingTaskCount = 0.0;
    private long timeout;
    private final HttpAgent agent;
    private final String identification;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService executorService;
    private AtomicBoolean isHealthServer = new AtomicBoolean(true);
    private AtomicBoolean isHealthServerTemp = new AtomicBoolean(true);
    private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);

    public ClientWorker(HttpAgent httpAgent, String identification) {
        this.agent = httpAgent;
        this.identification = identification;
        this.timeout = 30000L;
        this.executor = Executors.newScheduledThreadPool(1, r -> {
            Thread t = new Thread(r);
            t.setName("client.worker.executor");
            t.setDaemon(true);
            return t;
        });
        int threadSize = Runtime.getRuntime().availableProcessors();
        this.executorService = Executors.newScheduledThreadPool(threadSize, r -> {
            Thread t = new Thread(r);
            t.setName("client.long.polling.executor");
            t.setDaemon(true);
            return t;
        });
        log.info("Client identity :: {}", (Object)Constants.CLIENT_IDENTIFICATION_VALUE);
        this.executor.scheduleWithFixedDelay(() -> {
            try {
                this.checkConfigInfo();
            }
            catch (Throwable e) {
                log.error("[Sub check] rotate check error", e);
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

    public void checkConfigInfo() {
        double perTaskConfigSize;
        int listenerSize = this.cacheMap.size();
        int longingTaskCount = (int)Math.ceil((double)listenerSize / (perTaskConfigSize = 3000.0));
        if ((double)longingTaskCount > this.currentLongingTaskCount) {
            for (int i = (int)this.currentLongingTaskCount; i < longingTaskCount; ++i) {
                this.executorService.execute(new LongPollingRunnable());
            }
            this.currentLongingTaskCount = longingTaskCount;
        }
    }

    private List<String> checkUpdateDataIds(List<CacheData> cacheDataList, List<String> inInitializingCacheList) {
        StringBuilder sb = new StringBuilder();
        for (CacheData cacheData : cacheDataList) {
            sb.append(cacheData.tpId).append(Constants.WORD_SEPARATOR);
            sb.append(cacheData.itemId).append(Constants.WORD_SEPARATOR);
            sb.append(cacheData.tenantId).append(Constants.WORD_SEPARATOR);
            sb.append(this.identification).append(Constants.WORD_SEPARATOR);
            sb.append(cacheData.getMd5()).append(Constants.LINE_SEPARATOR);
            if (!cacheData.isInitializing()) continue;
            inInitializingCacheList.add(GroupKey.getKeyTenant((String)cacheData.tpId, (String)cacheData.itemId, (String)cacheData.tenantId));
        }
        boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
        return this.checkUpdateTpIds(sb.toString(), isInitializingCacheList);
    }

    public List<String> checkUpdateTpIds(String probeUpdateString, boolean isInitializingCacheList) {
        HashMap<String, String> params = new HashMap<String, String>(2);
        params.put("Listening-Configs", probeUpdateString);
        HashMap<String, String> headers = new HashMap<String, String>(2);
        headers.put("Long-Pulling-Timeout", "" + this.timeout);
        headers.put("Long-Pulling-Client-Identification", this.identification);
        if (isInitializingCacheList) {
            headers.put("Long-Pulling-Timeout-No-Hangup", "true");
        }
        if (StringUtils.isEmpty((Object)probeUpdateString)) {
            return Collections.emptyList();
        }
        try {
            long readTimeoutMs = this.timeout + (long)Math.round(this.timeout >> 1);
            Result result = this.agent.httpPostByConfig("/v1/cs/configs/listener", headers, params, readTimeoutMs);
            this.isHealthServer.set(true);
            if (result != null && result.isSuccess()) {
                this.setHealthServer(true);
                return this.parseUpdateDataIdResponse(result.getData().toString());
            }
        }
        catch (Exception ex) {
            this.setHealthServer(false);
            log.error("[Check update] get changed dataId exception. error message :: {}", (Object)ex.getMessage());
        }
        return Collections.emptyList();
    }

    public String getServerConfig(String namespace, String itemId, String tpId, long readTimeout) {
        HashMap<String, String> params = new HashMap<String, String>(3);
        params.put("namespace", namespace);
        params.put("itemId", itemId);
        params.put("tpId", tpId);
        Result result = this.agent.httpGetByConfig("/v1/cs/configs", null, params, readTimeout);
        if (result.isSuccess()) {
            return result.getData().toString();
        }
        log.error("[Sub server] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}", new Object[]{namespace, itemId, tpId, result.getCode()});
        return "";
    }

    public List<String> parseUpdateDataIdResponse(String response) {
        if (StringUtils.isEmpty((Object)response)) {
            return Collections.emptyList();
        }
        try {
            response = URLDecoder.decode(response, "UTF-8");
        }
        catch (Exception e) {
            log.error("[Polling resp] decode modifiedDataIdsString error", (Throwable)e);
        }
        LinkedList<String> updateList = new LinkedList<String>();
        for (String dataIdAndGroup : response.split(Constants.LINE_SEPARATOR)) {
            if (StringUtils.isEmpty((Object)dataIdAndGroup)) continue;
            String[] keyArr = dataIdAndGroup.split(Constants.WORD_SEPARATOR);
            String dataId = keyArr[0];
            String group = keyArr[1];
            if (keyArr.length == 2) {
                updateList.add(GroupKey.getKey((String)dataId, (String)group));
                log.info("[{}] [Polling resp] config changed. dataId={}, group={}", (Object)dataId, (Object)group);
                continue;
            }
            if (keyArr.length == 3) {
                String tenant = keyArr[2];
                updateList.add(GroupKey.getKeyTenant((String)dataId, (String)group, (String)tenant));
                log.info("[Polling resp] config changed. dataId={}, group={}, tenant={}", new Object[]{dataId, group, tenant});
                continue;
            }
            log.error("[{}] [Polling resp] invalid dataIdAndGroup error {}", (Object)dataIdAndGroup);
        }
        return updateList;
    }

    public void addTenantListeners(String namespace, String itemId, String tpId, List<? extends Listener> listeners) {
        CacheData cacheData = this.addCacheDataIfAbsent(namespace, itemId, tpId);
        for (Listener listener : listeners) {
            cacheData.addListener(listener);
        }
    }

    public CacheData addCacheDataIfAbsent(String namespace, String itemId, String tpId) {
        CacheData cacheData = this.cacheMap.get(tpId);
        if (cacheData != null) {
            return cacheData;
        }
        cacheData = new CacheData(namespace, itemId, tpId);
        CacheData lastCacheData = this.cacheMap.putIfAbsent(tpId, cacheData);
        if (lastCacheData == null) {
            String serverConfig = null;
            try {
                serverConfig = this.getServerConfig(namespace, itemId, tpId, 3000L);
                PoolParameterInfo poolInfo = (PoolParameterInfo)JSON.parseObject((String)serverConfig, PoolParameterInfo.class);
                cacheData.setContent(ContentUtil.getPoolContent((PoolParameter)poolInfo));
            }
            catch (Exception ex) {
                log.error("[Cache Data] Error. Service Unavailable :: {}", (Object)ex.getMessage());
            }
            int taskId = this.cacheMap.size() / 30000;
            cacheData.setTaskId(taskId);
            lastCacheData = cacheData;
        }
        return lastCacheData;
    }

    public boolean isHealthServer() {
        return this.isHealthServer.get();
    }

    private void setHealthServer(boolean isHealthServer) {
        this.isHealthServer.set(isHealthServer);
    }

    class LongPollingRunnable
    implements Runnable {
        LongPollingRunnable() {
        }

        private void checkStatus() {
            if (Objects.equals(ClientWorker.this.isHealthServerTemp.get(), Boolean.FALSE) && Objects.equals(ClientWorker.this.isHealthServer.get(), Boolean.TRUE)) {
                ClientWorker.this.isHealthServerTemp.set(Boolean.TRUE);
                log.info("\ud83d\ude80 The client reconnects to the server successfully.");
            }
            if (!ClientWorker.this.isHealthServer.get()) {
                ClientWorker.this.isHealthServerTemp.set(Boolean.FALSE);
                log.error("[Check config] Error. exception message, Thread sleep 30 s.");
                Thread.sleep(30000L);
            }
        }

        @Override
        public void run() {
            this.checkStatus();
            ArrayList cacheDataList = new ArrayList();
            ArrayList inInitializingCacheList = new ArrayList();
            ClientWorker.this.cacheMap.forEach((key, val) -> cacheDataList.add(val));
            List changedTpIds = ClientWorker.this.checkUpdateDataIds(cacheDataList, inInitializingCacheList);
            for (String each : changedTpIds) {
                String[] keys = StrUtil.split((CharSequence)each, (CharSequence)"+");
                String tpId = keys[0];
                String itemId = keys[1];
                String namespace = keys[2];
                try {
                    String content = ClientWorker.this.getServerConfig(namespace, itemId, tpId, 3000L);
                    CacheData cacheData = (CacheData)ClientWorker.this.cacheMap.get(tpId);
                    String poolContent = ContentUtil.getPoolContent((PoolParameter)((PoolParameter)JSON.parseObject((String)content, PoolParameterInfo.class)));
                    cacheData.setContent(poolContent);
                }
                catch (Exception exception) {}
            }
            for (CacheData cacheData : cacheDataList) {
                if (cacheData.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant((String)cacheData.tpId, (String)cacheData.itemId, (String)cacheData.tenantId))) continue;
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
            inInitializingCacheList.clear();
            ClientWorker.this.executorService.execute(this);
        }
    }
}

