/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Elasticsearch1ApiCallBridge
implements ElasticsearchApiCallBridge {
    private static final long serialVersionUID = -2632363720584123682L;
    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch1ApiCallBridge.class);
    private final List<TransportAddress> transportAddresses;
    private transient Node node;

    Elasticsearch1ApiCallBridge() {
        this.transportAddresses = null;
    }

    Elasticsearch1ApiCallBridge(List<TransportAddress> transportAddresses) {
        Preconditions.checkArgument((transportAddresses != null && !transportAddresses.isEmpty() ? 1 : 0) != 0);
        this.transportAddresses = transportAddresses;
    }

    @Override
    public Client createClient(Map<String, String> clientConfig) {
        if (this.transportAddresses == null) {
            Settings settings = ImmutableSettings.settingsBuilder().put(clientConfig).put("http.enabled", false).build();
            this.node = NodeBuilder.nodeBuilder().settings(settings).client(true).data(false).node();
            Client client = this.node.client();
            if (LOG.isInfoEnabled()) {
                LOG.info("Created Elasticsearch client from embedded node");
            }
            return client;
        }
        Settings settings = ImmutableSettings.settingsBuilder().put(clientConfig).build();
        TransportClient transportClient = new TransportClient(settings);
        for (TransportAddress transport : this.transportAddresses) {
            transportClient.addTransportAddress(transport);
        }
        if (transportClient.connectedNodes().isEmpty()) {
            throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
        }
        return transportClient;
    }

    @Override
    public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
        if (!bulkItemResponse.isFailed()) {
            return null;
        }
        return new RuntimeException(bulkItemResponse.getFailureMessage());
    }

    @Override
    public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
        LOG.warn("Elasticsearch 1.x does not support backoff retries.");
    }

    @Override
    public void cleanup() {
        if (this.node != null && !this.node.isClosed()) {
            this.node.close();
            this.node = null;
        }
    }
}

