package org.apache.gobblin.elasticsearch.writer;

import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.List;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.commons.math3.util.Pair;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.Batch;
import org.apache.gobblin.writer.BatchAsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.class */
public class ElasticsearchRestWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchRestWriter.class);
    private final RestHighLevelClient client;
    private final RestClient lowLevelClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ElasticsearchRestWriter(Config config) throws IOException {
        super(config);
        int intValue = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE, 5).intValue();
        try {
            PasswordManager passwordManager = PasswordManager.getInstance();
            Boolean valueOf = Boolean.valueOf(ConfigUtils.getBoolean(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, false));
            if (valueOf.booleanValue()) {
                String string = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT);
                String readPassword = passwordManager.readPassword(ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, ""));
                String string2 = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, "");
                String string3 = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT);
                String readPassword2 = passwordManager.readPassword(ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, ""));
                String string4 = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, "");
                log.info("Truststore absolutePath is:" + Paths.get(string4, new String[0]).toAbsolutePath().normalize().toString());
                this.lowLevelClient = buildRestClient(this.hostAddresses, intValue, true, string, readPassword, string2, string3, readPassword2, string4);
            } else {
                this.lowLevelClient = buildRestClient(this.hostAddresses, intValue);
            }
            this.client = new RestHighLevelClient(this.lowLevelClient);
            log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}", new Object[]{this.indexName, this.indexType, Boolean.valueOf(this.idMappingEnabled), this.typeMapper.getClass().getCanonicalName(), valueOf});
        } catch (Exception e) {
            throw new IOException("Failed to instantiate rest elasticsearch client", e);
        }
    }

    @Override // org.apache.gobblin.elasticsearch.writer.ElasticsearchWriterBase
    int getDefaultPort() {
        return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_REST_WRITER_DEFAULT_PORT;
    }

    private static RestClient buildRestClient(List<InetSocketTransportAddress> list, int i) throws Exception {
        return buildRestClient(list, i, false, null, null, null, null, null, null);
    }

    private static RestClient buildRestClient(List<InetSocketTransportAddress> list, int i, boolean z, String str, String str2, String str3, String str4, String str5, String str6) throws Exception {
        RestClientBuilder httpClientConfigCallback;
        HttpHost[] httpHostArr = new HttpHost[list.size()];
        String str7 = z ? "https" : "http";
        for (int i2 = 0; i2 < httpHostArr.length; i2++) {
            InetSocketTransportAddress inetSocketTransportAddress = list.get(i2);
            httpHostArr[i2] = new HttpHost(inetSocketTransportAddress.getAddress(), inetSocketTransportAddress.getPort(), str7);
        }
        RestClientBuilder builder = RestClient.builder(httpHostArr);
        if (z) {
            log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", str4, str6);
            KeyStore keyStore = KeyStore.getInstance(str4);
            FileInputStream fileInputStream = new FileInputStream(str6);
            try {
                keyStore.load(fileInputStream, str5.toCharArray());
                fileInputStream.close();
                SSLContextBuilder loadTrustMaterial = SSLContexts.custom().loadTrustMaterial(keyStore, (TrustStrategy) null);
                log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", str, str3);
                KeyStore keyStore2 = KeyStore.getInstance(str);
                fileInputStream = new FileInputStream(str3);
                try {
                    keyStore2.load(fileInputStream, str2.toCharArray());
                    fileInputStream.close();
                    loadTrustMaterial.loadKeyMaterial(keyStore2, str2.toCharArray());
                    SSLContext build = loadTrustMaterial.build();
                    httpClientConfigCallback = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                        return httpAsyncClientBuilder.setSSLContext(build).setSSLHostnameVerifier(new NoopHostnameVerifier()).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(i).build());
                    });
                } finally {
                }
            } finally {
            }
        } else {
            httpClientConfigCallback = builder.setHttpClientConfigCallback(httpAsyncClientBuilder2 -> {
                return httpAsyncClientBuilder2.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(i).build());
            });
        }
        httpClientConfigCallback.setRequestConfigCallback(builder2 -> {
            return builder2.setConnectionRequestTimeout(0);
        });
        return httpClientConfigCallback.build();
    }

    public Future<WriteResponse> write(Batch<Object> batch, @Nullable WriteCallback writeCallback) {
        Pair<BulkRequest, FutureCallbackHolder> prepareBatch = prepareBatch(batch, writeCallback);
        try {
            this.client.bulkAsync((BulkRequest) prepareBatch.getFirst(), ((FutureCallbackHolder) prepareBatch.getSecond()).getActionListener(), new Header[0]);
            return ((FutureCallbackHolder) prepareBatch.getSecond()).getFuture();
        } catch (Exception e) {
            throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
        }
    }

    public void flush() throws IOException {
    }

    @Override // org.apache.gobblin.elasticsearch.writer.ElasticsearchWriterBase, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        this.lowLevelClient.close();
    }

    @VisibleForTesting
    public RestHighLevelClient getRestHighLevelClient() {
        return this.client;
    }

    @VisibleForTesting
    public RestClient getRestLowLevelClient() {
        return this.lowLevelClient;
    }
}
