package risesoft.data.transfer.stream.es.in;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.risesoft.elastic.client.ElasticsearchRestClient;
import net.risesoft.elastic.client.pojo.QueryModel;
import net.risesoft.y9.json.Y9JsonUtil;
import org.apache.commons.lang3.StringUtils;
import risesoft.data.transfer.core.data.Data;
import risesoft.data.transfer.core.data.StringData;
import risesoft.data.transfer.core.exception.CommonErrorCode;
import risesoft.data.transfer.core.exception.FrameworkErrorCode;
import risesoft.data.transfer.core.exception.TransferException;
import risesoft.data.transfer.core.log.Logger;
import risesoft.data.transfer.core.log.LoggerFactory;
import risesoft.data.transfer.core.stream.in.DataInputStream;
import risesoft.data.transfer.core.stream.in.DataInputStreamFactory;
import risesoft.data.transfer.core.util.Configuration;
import risesoft.data.transfer.core.util.ValueUtils;

/* loaded from: input_file:risesoft/data/transfer/stream/es/in/ElasticsearchInputStreamFactory.class */
public class ElasticsearchInputStreamFactory implements DataInputStreamFactory {
    private ElasticsearchRestClient elasticsearchRestClient;
    private Logger logger;
    private String url;
    private String username;
    private String password;
    private String indexName;
    private List<String> columns;
    private String query;
    private String splitPk;
    private Boolean precise;
    private Integer tableNumber;
    private Integer splitFactor;
    private Map<String, String> columnTypes;

    public ElasticsearchInputStreamFactory(Configuration configuration, LoggerFactory loggerFactory) {
        this.url = (String) ValueUtils.getRequired(configuration.getString("jdbcUrl"), "缺失连接地址");
        this.password = configuration.getString("password", "");
        this.username = configuration.getString("userName", "");
        this.indexName = (String) ValueUtils.getRequired(configuration.getString("tableName"), "缺失索引表名称");
        this.columns = (List) ValueUtils.getRequired(configuration.getList("column", String.class), "缺失字段列表");
        this.query = configuration.getString("where", "").trim();
        this.splitPk = configuration.getString("splitPk");
        this.precise = configuration.getBool("precise", false);
        this.splitFactor = configuration.getInt("splitFactor", -1);
        this.tableNumber = configuration.getInt("tableNumber", -1);
        this.logger = loggerFactory.getLogger(configuration.getString("name", "ElasticsearchInputStreamFactory"));
        this.elasticsearchRestClient = new ElasticsearchRestClient(this.url, this.username, this.password);
    }

    /* renamed from: getStream, reason: merged with bridge method [inline-methods] */
    public DataInputStream m0getStream() {
        return new ElasticsearchInputStream(this.elasticsearchRestClient, this.indexName, this.columnTypes, this.logger);
    }

    public void init() {
        this.columnTypes = new HashMap();
        try {
            String mapping = this.elasticsearchRestClient.getMapping(this.indexName);
            if (mapping.equals("failed")) {
                throw TransferException.as(FrameworkErrorCode.RUNTIME_ERROR, "初始化ES输入流工厂失败：获取索引表字段信息失败");
            }
            Map map = (Map) ((Map) Y9JsonUtil.readHashMap(mapping).get(this.indexName)).get("mappings");
            Map map2 = (Map) map.get("properties");
            if (map2 == null) {
                map2 = (Map) ((Map) map.values().stream().findFirst().get()).get("properties");
            }
            for (Map.Entry entry : map2.entrySet()) {
                this.columnTypes.put((String) entry.getKey(), (String) ((Map) entry.getValue()).get("type"));
            }
        } catch (Exception e) {
            throw TransferException.as(FrameworkErrorCode.RUNTIME_ERROR, "初始化ES输入流工厂失败，异常信息：" + e.getMessage(), e);
        }
    }

    public void close() throws Exception {
    }

    public List<Data> splitToData(int i) throws Exception {
        String str;
        ArrayList arrayList;
        try {
            str = "{";
            int count = this.elasticsearchRestClient.getCount(this.indexName, (StringUtils.isNotBlank(this.query) ? str + "\"query\":" + this.query : "{") + "}");
            int intValue = this.tableNumber.intValue() != -1 ? this.tableNumber.intValue() : this.splitFactor.intValue() != -1 ? i * this.splitFactor.intValue() : 1;
            if (intValue >= 1 && StringUtils.isNotEmpty(this.splitPk)) {
                if (this.logger.isInfo()) {
                    this.logger.info(this, "sub data to " + intValue);
                }
                if (this.precise.booleanValue()) {
                    arrayList = new ArrayList();
                    for (Map<String, Object> map : getGroupData()) {
                        String valueOf = String.valueOf(map.get("key"));
                        Integer num = (Integer) map.get("doc_count");
                        QueryModel queryModel = new QueryModel();
                        queryModel.set_source(QueryModel.get_source(this.columns));
                        if (StringUtils.isNotBlank(this.query)) {
                            ArrayList arrayList2 = new ArrayList();
                            arrayList2.add(Y9JsonUtil.readHashMap(this.query));
                            arrayList2.add(Y9JsonUtil.readHashMap(QueryModel.get_query(this.splitPk, valueOf, "term")));
                            queryModel.setQuery(QueryModel.get_boolMustQuery(arrayList2));
                        } else {
                            queryModel.setQuery(QueryModel.get_query(this.splitPk, valueOf, "term"));
                        }
                        queryModel.setSize(num);
                        arrayList.add(new StringData(Y9JsonUtil.writeValueAsString(queryModel)));
                    }
                } else {
                    arrayList = new ArrayList();
                    for (Map<String, Integer> map2 : splitDataIntoParts(count, intValue)) {
                        QueryModel queryModel2 = new QueryModel();
                        queryModel2.set_source(QueryModel.get_source(this.columns));
                        if (StringUtils.isNotBlank(this.query)) {
                            queryModel2.setQuery(this.query);
                        }
                        queryModel2.setSort("{\"" + this.splitPk + "\":{\"order\":\"asc\"}}");
                        queryModel2.setFrom(map2.get("from"));
                        queryModel2.setSize(map2.get("size"));
                        arrayList.add(new StringData(Y9JsonUtil.writeValueAsString(queryModel2)));
                    }
                }
                if (this.logger.isInfo()) {
                    this.logger.info(this, "sub data end: " + arrayList.size());
                }
            } else {
                if (count > 10000) {
                    throw new TransferException(CommonErrorCode.WAIT_TIME_EXCEED, "数据过大，建议使用切分模式");
                }
                this.logger.info(this, "no sub data");
                arrayList = new ArrayList();
                QueryModel queryModel3 = new QueryModel();
                queryModel3.set_source(QueryModel.get_source(this.columns));
                if (StringUtils.isNotBlank(this.query)) {
                    queryModel3.setQuery(this.query);
                }
                queryModel3.setSize(Integer.valueOf(count));
                arrayList.add(new StringData(Y9JsonUtil.writeValueAsString(queryModel3)));
            }
            return arrayList;
        } catch (Exception e) {
            throw TransferException.as(CommonErrorCode.RUNTIME_ERROR, "elastic数量查询-执行报错", e);
        }
    }

    private List<Map<String, Integer>> splitDataIntoParts(int i, int i2) {
        if (i <= 0) {
            throw new TransferException(CommonErrorCode.CONFIG_ERROR, "无数据，无法切分");
        }
        ArrayList arrayList = new ArrayList();
        int i3 = i / i2;
        int i4 = i % i2;
        int i5 = 0;
        while (i5 < i2) {
            HashMap hashMap = new HashMap();
            int i6 = i5 * i3;
            int i7 = i5 == i2 - 1 ? i : i6 + i3;
            if (i4 > 0) {
                i7++;
                i4--;
            }
            hashMap.put("from", Integer.valueOf(i6));
            hashMap.put("size", Integer.valueOf(i7));
            arrayList.add(hashMap);
            i5++;
        }
        return arrayList;
    }

    private List<Map<String, Object>> getGroupData() {
        QueryModel queryModel = new QueryModel();
        queryModel.setAggs("{\"aggs_name\":{\"terms\":{\"field\":\"" + this.splitPk + "\", \"size\":100}}}");
        if (StringUtils.isNotBlank(this.query)) {
            queryModel.setQuery(this.query);
        }
        try {
            return (List) this.elasticsearchRestClient.search(queryModel, this.indexName).get("data");
        } catch (Exception e) {
            throw TransferException.as(CommonErrorCode.RUNTIME_ERROR, "elastic分组查询-执行报错", e);
        }
    }
}
