package io.basestar.spark.elasticsearch;

import io.basestar.spark.ScalaUtils;
import io.basestar.spark.Sink;
import io.basestar.storage.elasticsearch.ElasticsearchUtils;
import io.basestar.storage.elasticsearch.mapping.Mappings;
import io.basestar.storage.elasticsearch.mapping.Settings;
import io.basestar.util.Nullsafe;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.http.HttpHost;
import org.apache.spark.rdd.RDD;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.spark.rdd.EsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/basestar/spark/elasticsearch/ElasticsearchSink.class */
public class ElasticsearchSink implements Sink<RDD<Map<String, Object>>> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSink.class);
    private static final String DEFAULT_BATCH_BYTES = "1mb";
    private static final int DEFAULT_BATCH_LIMIT = 1000;

    @Nonnull
    private final String hostName;

    @Nonnull
    private final int port;

    @Nonnull
    private final String protocol;

    @Nonnull
    private final String indexName;

    @Nonnull
    private final String typeName;

    @Nonnull
    private final Mappings mappings;

    @Nonnull
    private final Settings settings;

    @Nonnull
    private final String id;

    @Nullable
    private final String pipeline;

    @Nonnull
    private final String batchBytes;
    private final int batchEntries;

    /* loaded from: input_file:io/basestar/spark/elasticsearch/ElasticsearchSink$Builder.class */
    public static class Builder {
        private String hostName;
        private Integer port;
        private String protocol;
        private String indexName;
        private String typeName;
        private Mappings mappings;
        private Settings settings;
        private String id;
        private String pipeline;
        private String batchBytes;
        private Integer batchEntries;

        Builder() {
        }

        public Builder hostName(String str) {
            this.hostName = str;
            return this;
        }

        public Builder port(Integer num) {
            this.port = num;
            return this;
        }

        public Builder protocol(String str) {
            this.protocol = str;
            return this;
        }

        public Builder indexName(String str) {
            this.indexName = str;
            return this;
        }

        public Builder typeName(String str) {
            this.typeName = str;
            return this;
        }

        public Builder mappings(Mappings mappings) {
            this.mappings = mappings;
            return this;
        }

        public Builder settings(Settings settings) {
            this.settings = settings;
            return this;
        }

        public Builder id(String str) {
            this.id = str;
            return this;
        }

        public Builder pipeline(String str) {
            this.pipeline = str;
            return this;
        }

        public Builder batchBytes(String str) {
            this.batchBytes = str;
            return this;
        }

        public Builder batchEntries(Integer num) {
            this.batchEntries = num;
            return this;
        }

        public ElasticsearchSink build() {
            return new ElasticsearchSink(this.hostName, this.port, this.protocol, this.indexName, this.typeName, this.mappings, this.settings, this.id, this.pipeline, this.batchBytes, this.batchEntries);
        }

        public String toString() {
            return "ElasticsearchSink.Builder(hostName=" + this.hostName + ", port=" + this.port + ", protocol=" + this.protocol + ", indexName=" + this.indexName + ", typeName=" + this.typeName + ", mappings=" + this.mappings + ", settings=" + this.settings + ", id=" + this.id + ", pipeline=" + this.pipeline + ", batchBytes=" + this.batchBytes + ", batchEntries=" + this.batchEntries + ")";
        }
    }

    ElasticsearchSink(String str, Integer num, String str2, String str3, String str4, Mappings mappings, Settings settings, String str5, String str6, String str7, Integer num2) {
        this.hostName = (String) Nullsafe.require(str);
        this.port = ((Integer) Nullsafe.require(num)).intValue();
        this.protocol = (String) Nullsafe.option(str2, "http");
        this.indexName = (String) Nullsafe.require(str3);
        this.typeName = (String) Nullsafe.option(str4, "_doc");
        this.mappings = (Mappings) Nullsafe.require(mappings);
        this.settings = (Settings) Nullsafe.require(settings);
        this.id = (String) Nullsafe.option(str5, "id");
        this.pipeline = str6;
        this.batchBytes = (String) Nullsafe.option(str7, DEFAULT_BATCH_BYTES);
        this.batchEntries = ((Integer) Nullsafe.option(num2, Integer.valueOf(DEFAULT_BATCH_LIMIT))).intValue();
    }

    public void accept(RDD<Map<String, Object>> rdd) {
        initializeIndex();
        try {
            boolean equalsIgnoreCase = "https".equalsIgnoreCase(this.protocol);
            HashMap hashMap = new HashMap();
            hashMap.put("es.nodes", this.hostName);
            hashMap.put("es.port", Integer.toString(this.port));
            hashMap.put("es.net.ssl", equalsIgnoreCase ? "true" : "false");
            hashMap.put("es.nodes.wan.only", "true");
            hashMap.put("es.mapping.id", this.id);
            hashMap.put("es.batch.size.bytes", this.batchBytes);
            hashMap.put("es.batch.size.entries", Integer.toString(this.batchEntries));
            hashMap.put("es.resource", this.indexName + "/" + this.typeName);
            if (this.pipeline != null) {
                hashMap.put("es.ingest.pipeline", this.pipeline);
            }
            log.info("Importing using configuration {}", hashMap);
            EsSpark.saveToEs(rdd, ScalaUtils.asScalaMap(hashMap));
            restoreSettings();
        } catch (Throwable th) {
            restoreSettings();
            throw th;
        }
    }

    private RestHighLevelClient client() {
        return new RestHighLevelClient(RestClient.builder(new HttpHost[]{new HttpHost(this.hostName, this.port, this.protocol)}));
    }

    private void initializeIndex() {
        try {
            RestHighLevelClient client = client();
            Throwable th = null;
            try {
                ElasticsearchUtils.syncIndex(client, this.indexName, this.mappings, this.settings.toBuilder().refreshInterval("-1").build()).join();
                if (client != null) {
                    if (0 != 0) {
                        try {
                            client.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        client.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to initialize index", e);
            throw new IllegalStateException(e);
        }
    }

    private void restoreSettings() {
        try {
            RestHighLevelClient client = client();
            Throwable th = null;
            try {
                ElasticsearchUtils.putDynamicSettings(client, this.indexName, this.settings).join();
                if (client != null) {
                    if (0 != 0) {
                        try {
                            client.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        client.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to restore settings", e);
            throw new IllegalStateException(e);
        }
    }

    public static Builder builder() {
        return new Builder();
    }
}
