package com.xiaomi.mone.log.stream.plugin.es;

import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.sun.jna.platform.win32.COM.tlb.imp.TlbConst;
import com.xiaomi.mone.es.EsProcessor;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.model.StorageInfo;
import com.xiaomi.mone.log.stream.job.compensate.MqMessageDTO;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.plugin.es.EsProcessorConf;
import com.xiaomi.youpin.docean.plugin.es.EsService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
/* loaded from: input_file:com/xiaomi/mone/log/stream/plugin/es/EsPlugin.class */
public class EsPlugin {
    private static EsConfig esConfig;
    public static final int SINGLE_MESSAGE_BYTES_MAXIMAL = 10485760;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EsPlugin.class);
    private static ConcurrentHashMap<String, EsService> esServiceMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, EsProcessor> esProcessorMap = new ConcurrentHashMap<>();
    private static ReentrantLock esLock = new ReentrantLock();

    public static boolean InitEsConfig() {
        EsConfig esConfig2 = new EsConfig();
        Config ins = Config.ins();
        try {
            esConfig2.setBulkActions(Integer.parseInt(ins.get("es.bulk_actions", "100")));
            esConfig2.setByteSize(Long.parseLong(ins.get("es.byte_size", TlbConst.TYPELIB_MINOR_VERSION_OFFICE)));
            esConfig2.setConcurrentRequest(Integer.parseInt(ins.get("es.concurrent_request", "10")));
            esConfig2.setFlushInterval(Integer.parseInt(ins.get("es.flush_interval", "")));
            esConfig2.setRetryNumber(Integer.parseInt(ins.get("es.retry_num", "3")));
            esConfig2.setRetryInterval(Integer.parseInt(ins.get("es.retry_interval", "3")));
            log.info("[EsPlugin.getEsProcessor] init es config:{}", esConfig2);
            esConfig = esConfig2;
            return true;
        } catch (Exception e) {
            log.error("[EsPlugin.InitEsConfig] init es config err:", (Throwable) e);
            return false;
        }
    }

    public static EsProcessor getEsProcessor(StorageInfo storageInfo, Consumer<MqMessageDTO> consumer) {
        return getEsProcessor(storageInfo, esConfig, consumer);
    }

    public static EsProcessor getEsProcessor(final StorageInfo storageInfo, EsConfig esConfig2, final Consumer<MqMessageDTO> consumer) {
        esLock.lock();
        try {
            EsProcessor esProcessor = esProcessorMap.get(cacheKey(storageInfo));
            if (esProcessor != null) {
                esLock.unlock();
                return esProcessor;
            }
            EsService esService = esServiceMap.get(cacheKey(storageInfo));
            if (esService == null) {
                esService = (StringUtils.isNotBlank(storageInfo.getUser()) && StringUtils.isNotBlank(storageInfo.getPwd())) ? new EsService(storageInfo.getAddr(), storageInfo.getUser(), storageInfo.getPwd()) : StringUtils.isNotBlank(storageInfo.getToken()) ? new EsService(storageInfo.getAddr(), storageInfo.getToken(), storageInfo.getCatalog(), storageInfo.getDatabase()) : new EsService(storageInfo.getAddr(), storageInfo.getUser(), storageInfo.getPwd());
                esServiceMap.put(cacheKey(storageInfo), esService);
            }
            EsProcessor esProcessor2 = esService.getEsProcessor(new EsProcessorConf(esConfig2.getBulkActions(), esConfig2.getByteSize(), esConfig2.getConcurrentRequest(), esConfig2.getFlushInterval(), esConfig2.getRetryNumber(), esConfig2.getRetryInterval(), new BulkProcessor.Listener() { // from class: com.xiaomi.mone.log.stream.plugin.es.EsPlugin.1
                @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
                public void beforeBulk(long j, BulkRequest bulkRequest) {
                }

                @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
                public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                    EsPlugin.log.debug("success send to es,desc:{}", bulkRequest.getDescription());
                    AtomicInteger atomicInteger = new AtomicInteger();
                    bulkResponse.spliterator().forEachRemaining(bulkItemResponse -> {
                        if (bulkItemResponse.isFailed()) {
                            EsPlugin.log.error("Bulk executionId:[{}] has error messages:\t{}", Long.valueOf(j), String.format("Index:[%s], type:[%s], id:[%s], itemId:[%s], opt:[%s], version:[%s], errMsg:%s", bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId(), Integer.valueOf(bulkItemResponse.getItemId()), bulkItemResponse.getOpType().getLowercase(), Long.valueOf(bulkItemResponse.getVersion()), bulkItemResponse.getFailure().getCause().getMessage()));
                            atomicInteger.incrementAndGet();
                        }
                    });
                    EsPlugin.log.debug("Finished handling bulk commit executionId:[{}] for {} requests with {} errors", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions()), Integer.valueOf(atomicInteger.intValue()));
                }

                @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
                public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                    EsPlugin.log.error(String.format("fail send %s message to es,desc:%s,es addr:%s", Integer.valueOf(bulkRequest.numberOfActions()), bulkRequest.getDescription(), StorageInfo.this.getAddr()), (Throwable) new RuntimeException(th));
                    Class<?> cls = th.getClass();
                    EsPlugin.log.error("Bulk [{}] finished with [{}] requests of error:{}, {}, {}:-[{}]", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions()), cls.getName(), cls.getSimpleName(), cls.getTypeName(), cls.getCanonicalName(), th.getMessage());
                    MqMessageDTO mqMessageDTO = new MqMessageDTO();
                    mqMessageDTO.setEsInfo(StorageInfo.this);
                    ArrayList newArrayList = Lists.newArrayList();
                    bulkRequest.requests().stream().filter(docWriteRequest -> {
                        return docWriteRequest instanceof IndexRequest;
                    }).forEach(docWriteRequest2 -> {
                        Map<String, Object> sourceAsMap = ((IndexRequest) docWriteRequest2).sourceAsMap();
                        EsPlugin.log.error("Failure to handle index:[{}], type:[{}],id:[{}] data:[{}]", docWriteRequest2.index(), docWriteRequest2.type(), docWriteRequest2.id(), JSON.toJSONString(sourceAsMap));
                        MqMessageDTO.CompensateMqDTO compensateMqDTO = new MqMessageDTO.CompensateMqDTO();
                        compensateMqDTO.setMsg(JSON.toJSONString(sourceAsMap));
                        compensateMqDTO.setEsIndex(docWriteRequest2.index());
                        newArrayList.add(compensateMqDTO);
                    });
                    if (JSON.toJSONString(newArrayList).getBytes().length <= 10485760) {
                        mqMessageDTO.setCompensateMqDTOS(newArrayList);
                        consumer.accept(mqMessageDTO);
                    } else {
                        Iterator it = ListUtil.partition(newArrayList, 2).iterator();
                        while (it.hasNext()) {
                            mqMessageDTO.setCompensateMqDTOS((List) it.next());
                            consumer.accept(mqMessageDTO);
                        }
                    }
                }
            }));
            esProcessorMap.put(cacheKey(storageInfo), esProcessor2);
            esLock.unlock();
            return esProcessor2;
        } catch (Throwable th) {
            esLock.unlock();
            throw th;
        }
    }

    private static String cacheKey(StorageInfo storageInfo) {
        StringBuilder sb = new StringBuilder();
        sb.append(storageInfo.getId()).append(",");
        sb.append(storageInfo.getAddr());
        if (StringUtils.isNotBlank(storageInfo.getUser())) {
            sb.append(",").append(storageInfo.getUser());
        }
        if (StringUtils.isNotBlank(storageInfo.getPwd())) {
            sb.append(",").append(storageInfo.getPwd());
        }
        if (StringUtils.isNotBlank(storageInfo.getToken())) {
            sb.append(",").append(storageInfo.getToken());
        }
        if (StringUtils.isNotBlank(storageInfo.getCatalog())) {
            sb.append(",").append(storageInfo.getCatalog());
        }
        if (StringUtils.isNotBlank(storageInfo.getDatabase())) {
            sb.append(",").append(storageInfo.getDatabase());
        }
        return sb.toString();
    }
}
