package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.class */
public class ElasticsearchSink<T> extends RichSinkFunction<T> {
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
    private final Map<String, String> userConfig;
    private final List<TransportAddress> transportNodes;
    private final IndexRequestBuilder<T> indexRequestBuilder;
    private transient Node node;
    private transient Client client;
    private transient BulkProcessor bulkProcessor;
    private final AtomicBoolean hasFailure;
    private final AtomicReference<Throwable> failureThrowable;

    public ElasticsearchSink(Map<String, String> map, IndexRequestBuilder<T> indexRequestBuilder) {
        this.hasFailure = new AtomicBoolean(false);
        this.failureThrowable = new AtomicReference<>();
        this.userConfig = map;
        this.indexRequestBuilder = indexRequestBuilder;
        this.transportNodes = null;
    }

    public ElasticsearchSink(Map<String, String> map, List<TransportAddress> list, IndexRequestBuilder<T> indexRequestBuilder) {
        this.hasFailure = new AtomicBoolean(false);
        this.failureThrowable = new AtomicReference<>();
        this.userConfig = map;
        this.indexRequestBuilder = indexRequestBuilder;
        this.transportNodes = list;
    }

    public void open(Configuration configuration) {
        if (this.transportNodes == null) {
            this.node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put(this.userConfig).put("http.enabled", false).build()).client(true).data(false).node();
            this.client = this.node.client();
            if (LOG.isInfoEnabled()) {
                LOG.info("Created Elasticsearch Client {} from embedded Node", this.client);
            }
        } else {
            TransportClient transportClient = new TransportClient(ImmutableSettings.settingsBuilder().put(this.userConfig).build());
            Iterator<TransportAddress> it = this.transportNodes.iterator();
            while (it.hasNext()) {
                transportClient.addTransportAddress(it.next());
            }
            ImmutableList connectedNodes = transportClient.connectedNodes();
            if (connectedNodes.isEmpty()) {
                throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Connected to nodes: " + connectedNodes.toString());
            }
            this.client = transportClient;
            if (LOG.isInfoEnabled()) {
                LOG.info("Created Elasticsearch TransportClient {}", this.client);
            }
        }
        BulkProcessor.Builder builder = BulkProcessor.builder(this.client, new BulkProcessor.Listener() { // from class: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.1
            public void beforeBulk(long j, BulkRequest bulkRequest) {
            }

            public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                if (bulkResponse.hasFailures()) {
                    for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                        if (bulkItemResponse.isFailed()) {
                            ElasticsearchSink.LOG.error("Failed to index document in Elasticsearch: " + bulkItemResponse.getFailureMessage());
                            ElasticsearchSink.this.failureThrowable.compareAndSet(null, new RuntimeException(bulkItemResponse.getFailureMessage()));
                        }
                    }
                    ElasticsearchSink.this.hasFailure.set(true);
                }
            }

            public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                ElasticsearchSink.LOG.error(th.getMessage());
                ElasticsearchSink.this.failureThrowable.compareAndSet(null, th);
                ElasticsearchSink.this.hasFailure.set(true);
            }
        });
        builder.setConcurrentRequests(0);
        ParameterTool fromMap = ParameterTool.fromMap(this.userConfig);
        if (fromMap.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
            builder.setBulkActions(fromMap.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
        }
        if (fromMap.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
            builder.setBulkSize(new ByteSizeValue(fromMap.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
        }
        if (fromMap.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
            builder.setFlushInterval(TimeValue.timeValueMillis(fromMap.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
        }
        this.bulkProcessor = builder.build();
    }

    public void invoke(T t) {
        IndexRequest createIndexRequest = this.indexRequestBuilder.createIndexRequest(t, getRuntimeContext());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Emitting IndexRequest: {}", createIndexRequest);
        }
        this.bulkProcessor.add(createIndexRequest);
    }

    public void close() {
        if (this.bulkProcessor != null) {
            this.bulkProcessor.close();
            this.bulkProcessor = null;
        }
        if (this.client != null) {
            this.client.close();
        }
        if (this.node != null) {
            this.node.close();
        }
        if (this.hasFailure.get()) {
            Throwable th = this.failureThrowable.get();
            if (th == null) {
                throw new RuntimeException("An error occured in ElasticsearchSink.");
            }
            throw new RuntimeException("An error occured in ElasticsearchSink.", th);
        }
    }
}
