/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.doris.constant;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.wicp.tams.common.apiext.CollectionUtil;
import net.wicp.tams.common.apiext.LoggerUtil;
import net.wicp.tams.common.binlog.alone.DuckulaAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.binlog.alone.PluginAssit;
import net.wicp.tams.common.binlog.alone.binlog.bean.Rule;
import net.wicp.tams.common.constant.JvmStatus;
import net.wicp.tams.common.constant.ods.AddColName;
import net.wicp.tams.common.doris.bean.CheckPointConfig;
import net.wicp.tams.common.doris.bean.DorisConfig;
import net.wicp.tams.common.doris.constant.AlterMessage;
import net.wicp.tams.common.doris.constant.CheckpointService;
import net.wicp.tams.common.doris.constant.DorisStreamLoadBe;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisSinkV1 {
    private static final Logger log = LoggerFactory.getLogger(DorisSinkV1.class);
    private static final String tbFullNameFormate = "%s`%s";
    private static final String tbDataKeyFormate = "%s`%s`%s";
    private Long lastFlushTime = 0L;
    private final Integer flushCacheSeconds = 300;
    private final Integer flushCacheSize = 10000;
    private final ConcurrentHashMap<String, Map<String, Object>> cacheMap = new ConcurrentHashMap();
    private final DorisConfig dorisConfig;
    private final CheckPointConfig checkPointConfig;
    private final CheckpointService checkpointService;
    private final DorisStreamLoadBe dorisStreamLoadBe;
    private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
    private final Lock wLock = this.rwl.writeLock();
    private final long tryLockWaiteTime = 60L;

    public DorisSinkV1(DorisConfig dorisConfig, CheckPointConfig checkPointConfig) {
        this.dorisConfig = dorisConfig;
        this.checkPointConfig = checkPointConfig;
        this.checkpointService = new CheckpointService(checkPointConfig);
        this.dorisStreamLoadBe = new DorisStreamLoadBe(dorisConfig);
        this.lastFlushTime = System.currentTimeMillis();
        this.addTimerFlush();
        this.addShutdownHook();
    }

    public void sink(String db, String tb, String primaryKeyVales, Map<String, String> data) throws Exception {
        this.wLock.tryLock(60L, TimeUnit.SECONDS);
        try {
            String tbFullName = String.format(tbFullNameFormate, db, tb);
            String dataKey = String.format(tbDataKeyFormate, db, tb, primaryKeyVales);
            Map<String, Object> tbData = this.cacheMap.get(tbFullName);
            if (tbData == null) {
                HashMap tbDataNew = Maps.newHashMap();
                tbDataNew.put(dataKey, data);
                this.cacheMap.put(tbFullName, tbDataNew);
            } else {
                tbData.put(dataKey, data);
                if (tbData.size() >= this.flushCacheSize) {
                    log.info("\u8868\u6ee1\u5199\u5165\u5f00\u59cb ,tbFullName={},tbSize={}", (Object)tbFullName, (Object)tbData.size());
                    this.flush(tbFullName, db, tb);
                    log.info("\u8868\u6ee1\u5199\u5165\u5b8c\u6210 ,tbFullName={}", (Object)tbFullName);
                }
            }
        }
        catch (Exception e) {
            log.error("\u6d88\u8d39binlog\u5f02\u5e38", (Throwable)e);
            this.rmChkAndExitJvm(e);
            throw e;
        }
        finally {
            this.wLock.unlock();
        }
    }

    public void doBusiAsyncTrue(boolean isSplit, boolean logicDel, Map<Rule, List<Pair<ListenerConf.DuckulaEvent, Map<AddColName, Serializable>>>> sendDataCase) {
        if (MapUtils.isEmpty(sendDataCase)) {
            return;
        }
        HashMap<Pair, ArrayNode> datas = new HashMap<Pair, ArrayNode>();
        for (Rule rule : sendDataCase.keySet()) {
            List<Pair<ListenerConf.DuckulaEvent, Map<AddColName, Serializable>>> list = sendDataCase.get(rule);
            for (Pair<ListenerConf.DuckulaEvent, Map<AddColName, Serializable>> data : list) {
                ListenerConf.DuckulaEvent event = (ListenerConf.DuckulaEvent)data.getLeft();
                Pair ele = PluginAssit.getNewDbTb((Rule)rule, (ListenerConf.DuckulaEvent)event);
                ArrayNode json = (ArrayNode)datas.get(ele);
                json = json == null ? JsonNodeFactory.instance.arrayNode() : json;
                for (int i = 0; i < event.getItemsCount(); ++i) {
                    HashMap<String, String> valueMap = new HashMap<String, String>();
                    valueMap.putAll(DuckulaAssit.getValueMap((ListenerConf.DuckulaEvent)event, (int)i));
                    if (MapUtils.isNotEmpty((Map)((Map)data.getRight()))) {
                        for (AddColName addColName : ((Map)data.getRight()).keySet()) {
                            String colNameTrue = addColName.getColNameTrue();
                            if (valueMap.containsKey(colNameTrue)) continue;
                            valueMap.put(colNameTrue, String.valueOf(((Map)data.getRight()).get(addColName)));
                        }
                    }
                    CollectionUtil.filterNull(valueMap, (int)4);
                    json.addPOJO(valueMap);
                }
                datas.put(ele, json);
            }
        }
        for (Pair dbTb : datas.keySet()) {
            try {
                this.dorisStreamLoadBe.flushAndRetry((String)dbTb.getLeft(), (String)dbTb.getRight(), JSONObject.toJSONString(datas.get(dbTb)));
            }
            catch (Exception e) {
                throw new RuntimeException("\u6279\u91cf\u5bfc\u5165\u6570\u636e\u6709\u95ee\u9898", e);
            }
        }
    }

    private void flush(String tbKey, String db, String tb) throws Exception {
        ObjectMapper objmpa = new ObjectMapper();
        String data = objmpa.valueToTree(this.cacheMap.get(tbKey).values()).toString();
        log.info("\u5199\u5165\u6570\u636e\u5f00\u59cb,tb={},size={}", (Object)tb, (Object)this.cacheMap.get(tbKey).values().size());
        this.dorisStreamLoadBe.flushAndRetry(db, tb, data);
        this.cacheMap.remove(tbKey);
        log.info("\u5199\u5165\u6570\u636e\u7ed3\u675f,tb={}", (Object)tb);
    }

    public void flushCache() throws Exception {
        this.wLock.tryLock(60L, TimeUnit.SECONDS);
        try {
            log.info("flushCache-start,size={}", (Object)this.cacheMap.size());
            for (Map.Entry<String, Map<String, Object>> entry : this.cacheMap.entrySet()) {
                if (entry.getValue() == null) continue;
                this.flush(entry.getKey(), entry.getKey().split("`")[0], entry.getKey().split("`")[1]);
            }
        }
        catch (Exception e) {
            log.error("\u7f13\u5b58\u5199doris\u5f02\u5e38", (Throwable)e);
            this.rmChkAndExitJvm(e);
        }
        finally {
            this.wLock.unlock();
        }
    }

    private void addTimerFlush() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                log.info("\u5b9a\u65f6stream load, size={}", (Object)this.cacheMap.size());
                this.flushCache();
                this.lastFlushTime = System.currentTimeMillis();
            }
            catch (Exception e) {
                log.error("\u5199doris\u5f02\u5e38\u91cd\u542f\uff1a", (Throwable)e);
                this.rmChkAndExitJvm(e);
            }
        }, this.flushCacheSeconds.intValue(), this.flushCacheSeconds.intValue(), TimeUnit.SECONDS);
    }

    private void rmChkAndExitJvm(Exception e) {
        AlterMessage.alterMsgDataError(this.checkPointConfig.getSourceHost(), e.getMessage().toLowerCase());
        this.checkpointService.deleteChk(this.lastFlushTime);
        LoggerUtil.exit((JvmStatus)JvmStatus.s15);
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                log.info("----------------------\u6267\u884c\u5173\u95ed\u8fdb\u7a0b \u94a9\u5b50\u5f00\u59cb-------------------------------------");
                DorisSinkV1.this.checkpointService.deleteChk(DorisSinkV1.this.lastFlushTime);
                log.info("----------------------\u6267\u884c\u5173\u95ed\u8fdb\u7a0b \u94a9\u5b50\u5b8c\u6210-------------------------------------");
            }
        });
    }
}

