/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.flink.elasticsearch6.sink;

import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.StringTemplate;
import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.elasticsearch6.sink.ElasticsearchOutputFormat;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigMergeable;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

@AutoService(value={BaseFlinkSink.class})
public class Elasticsearch6
implements FlinkStreamSink,
FlinkBatchSink {
    private static final long serialVersionUID = 8445868321245456793L;
    private static final int DEFAULT_CONFIG_SIZE = 3;
    private Config config;
    private String indexName;

    public void setConfig(Config config) {
        this.config = config;
    }

    public Config getConfig() {
        return this.config;
    }

    public CheckResult checkConfig() {
        return CheckConfigUtil.checkAllExists((Config)this.config, (String[])new String[]{"hosts"});
    }

    public void prepare(FlinkEnvironment env) {
        HashMap<String, String> configMap = new HashMap<String, String>(3);
        configMap.put("index", "seatunnel");
        configMap.put("index_type", "log");
        configMap.put("index_time_format", "yyyy.MM.dd");
        Config defaultConfig = ConfigFactory.parseMap(configMap);
        this.config = this.config.withFallback((ConfigMergeable)defaultConfig);
    }

    public String getPluginName() {
        return "ElasticSearch";
    }

    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
        ArrayList<HttpHost> httpHosts = new ArrayList<HttpHost>();
        List hosts = this.config.getStringList("hosts");
        for (String host : hosts) {
            httpHosts.add(new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http"));
        }
        RowTypeInfo rowTypeInfo = (RowTypeInfo)dataStream.getType();
        this.indexName = StringTemplate.substitute((String)this.config.getString("index"), (String)this.config.getString("index_time_format"));
        ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<Row>(httpHosts, (element, ctx, indexer) -> indexer.add(this.createIndexRequest(rowTypeInfo.getFieldNames(), (Row)element)));
        esSinkBuilder.setBulkFlushMaxActions(1);
        if (this.config.hasPath("parallelism")) {
            int parallelism = this.config.getInt("parallelism");
            dataStream.addSink(esSinkBuilder.build()).setParallelism(parallelism);
        } else {
            dataStream.addSink(esSinkBuilder.build());
        }
    }

    public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
        RowTypeInfo rowTypeInfo = (RowTypeInfo)dataSet.getType();
        this.indexName = StringTemplate.substitute((String)this.config.getString("index"), (String)this.config.getString("index_time_format"));
        DataSink dataSink = dataSet.output(new ElasticsearchOutputFormat<Row>(this.config, (element, ctx, indexer) -> indexer.add(this.createIndexRequest(rowTypeInfo.getFieldNames(), (Row)element))));
        if (this.config.hasPath("parallelism")) {
            int parallelism = this.config.getInt("parallelism");
            dataSink.setParallelism(parallelism);
        }
    }

    private IndexRequest createIndexRequest(String[] fieldNames, Row element) {
        int elementLen = element.getArity();
        HashMap<String, Object> json = new HashMap<String, Object>(elementLen);
        for (int i = 0; i < elementLen; ++i) {
            json.put(fieldNames[i], element.getField(i));
        }
        return ((IndexRequest)Requests.indexRequest().index(this.indexName)).type(this.config.getString("index_type")).source(json);
    }
}

