package com.xiaomi.mone.log.stream.job.extension.impl;

import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.model.StorageInfo;
import com.xiaomi.mone.log.stream.common.util.StreamUtils;
import com.xiaomi.mone.log.stream.job.compensate.MqMessageDTO;
import com.xiaomi.mone.log.stream.job.extension.MessageSender;
import com.xiaomi.mone.log.stream.job.extension.MqMessageProduct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import run.mone.doris.DorisStreamLoad;

/* loaded from: input_file:com/xiaomi/mone/log/stream/job/extension/impl/DorisMessageSender.class */
public class DorisMessageSender implements MessageSender {
    private final String tableName;
    private final MqMessageProduct compensateMsgProduct;
    private final List<String> columnList;
    private DorisStreamLoad dorisStreamLoad;
    private String dataBaseName;
    private ExecutorService executors;
    private List<Map<String, Object>> dataList = new ArrayList();
    private ReentrantLock reentrantLock = new ReentrantLock();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DorisMessageSender.class);
    private static final Integer DEFAULT_PORT = 8030;
    private static final Integer BATCH_SEND_SIZE = 1000;

    public DorisMessageSender(String str, MqMessageProduct mqMessageProduct, StorageInfo storageInfo, List<String> list) {
        this.tableName = str;
        this.compensateMsgProduct = mqMessageProduct;
        this.columnList = list;
        StreamUtils.JdbcInfo extractInfoFromJdbcUrl = StreamUtils.extractInfoFromJdbcUrl(storageInfo.getAddr());
        Integer port = null != storageInfo.getPort() ? storageInfo.getPort() : DEFAULT_PORT;
        this.dataBaseName = extractInfoFromJdbcUrl.getDbName();
        this.dorisStreamLoad = new DorisStreamLoad(extractInfoFromJdbcUrl.getIp(), storageInfo.getUser(), storageInfo.getPwd(), port.intValue());
        this.executors = Executors.newVirtualThreadPerTaskExecutor();
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 1000L, 3000L, TimeUnit.MILLISECONDS);
    }

    private void flush() {
        try {
        } catch (Exception e) {
            log.error("flush data error", (Throwable) e);
        } finally {
            this.reentrantLock.unlock();
        }
        if (this.reentrantLock.tryLock()) {
            this.dorisStreamLoad.sendData(this.dataBaseName, this.tableName, this.columnList, this.dataList);
            this.dataList.clear();
        }
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.MessageSender
    public Boolean send(Map<String, Object> map) throws Exception {
        try {
            log.info("dataBaseName:{},tableName:{},columnList:{},data:{}", this.dataBaseName, this.tableName, this.columnList, Constant.GSON.toJson(map));
            this.reentrantLock.lock();
            this.dataList.add(map);
            if (this.dataList.size() > BATCH_SEND_SIZE.intValue()) {
                List<Map<String, Object>> subList = this.dataList.subList(0, BATCH_SEND_SIZE.intValue());
                this.dorisStreamLoad.sendData(this.dataBaseName, this.tableName, this.columnList, subList);
                subList.clear();
            }
            return true;
        } finally {
            this.reentrantLock.unlock();
        }
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.MessageSender
    public boolean compensateSend(MqMessageDTO mqMessageDTO) {
        if (null == this.compensateMsgProduct) {
            return false;
        }
        this.compensateMsgProduct.product(mqMessageDTO);
        return true;
    }
}
