/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch5;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Elasticsearch5ApiCallBridge
implements ElasticsearchApiCallBridge {
    private static final long serialVersionUID = -5222683870097809633L;
    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch5ApiCallBridge.class);
    private final List<InetSocketAddress> transportAddresses;

    Elasticsearch5ApiCallBridge(List<InetSocketAddress> transportAddresses) {
        Preconditions.checkArgument((transportAddresses != null && !transportAddresses.isEmpty() ? 1 : 0) != 0);
        this.transportAddresses = transportAddresses;
    }

    @Override
    public Client createClient(Map<String, String> clientConfig) {
        Settings settings = Settings.builder().put(clientConfig).put("http.type", "netty3").put("transport.type", "netty3").build();
        PreBuiltTransportClient transportClient = new PreBuiltTransportClient(settings, new Class[0]);
        for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(this.transportAddresses)) {
            transportClient.addTransportAddress(transport);
        }
        if (transportClient.connectedNodes().isEmpty()) {
            throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
        }
        return transportClient;
    }

    @Override
    public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
        if (!bulkItemResponse.isFailed()) {
            return null;
        }
        return bulkItemResponse.getFailure().getCause();
    }

    @Override
    public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
        BackoffPolicy backoffPolicy;
        if (flushBackoffPolicy != null) {
            switch (flushBackoffPolicy.getBackoffType()) {
                case CONSTANT: {
                    backoffPolicy = BackoffPolicy.constantBackoff(new TimeValue(flushBackoffPolicy.getDelayMillis()), flushBackoffPolicy.getMaxRetryCount());
                    break;
                }
                default: {
                    backoffPolicy = BackoffPolicy.exponentialBackoff(new TimeValue(flushBackoffPolicy.getDelayMillis()), flushBackoffPolicy.getMaxRetryCount());
                    break;
                }
            }
        } else {
            backoffPolicy = BackoffPolicy.noBackoff();
        }
        builder.setBackoffPolicy(backoffPolicy);
    }

    @Override
    public void cleanup() {
    }
}

