/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch2;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.collect.ImmutableList;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch2.BulkProcessorIndexer;
import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchSink<T>
extends RichSinkFunction<T> {
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
    private final Map<String, String> userConfig;
    private final List<InetSocketAddress> transportAddresses;
    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
    private transient Client client;
    private transient BulkProcessor bulkProcessor;
    private transient RequestIndexer requestIndexer;
    private final AtomicBoolean hasFailure = new AtomicBoolean(false);
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference();

    public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
        this.userConfig = userConfig;
        this.elasticsearchSinkFunction = elasticsearchSinkFunction;
        Preconditions.checkArgument((transportAddresses != null && transportAddresses.size() > 0 ? 1 : 0) != 0);
        this.transportAddresses = transportAddresses;
    }

    public void open(Configuration configuration) {
        ArrayList<InetSocketTransportAddress> transportNodes = new ArrayList<InetSocketTransportAddress>(this.transportAddresses.size());
        for (InetSocketAddress address : this.transportAddresses) {
            transportNodes.add(new InetSocketTransportAddress(address));
        }
        Settings settings = Settings.settingsBuilder().put(this.userConfig).build();
        TransportClient transportClient = TransportClient.builder().settings(settings).build();
        for (TransportAddress transportAddress : transportNodes) {
            transportClient.addTransportAddress(transportAddress);
        }
        ImmutableList nodes = ImmutableList.copyOf(transportClient.connectedNodes());
        if (nodes.isEmpty()) {
            throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
        }
        this.client = transportClient;
        if (LOG.isInfoEnabled()) {
            LOG.info("Created Elasticsearch TransportClient {}", (Object)this.client);
        }
        BulkProcessor.Builder builder = BulkProcessor.builder((Client)this.client, (BulkProcessor.Listener)new BulkProcessor.Listener(){

            public void beforeBulk(long executionId, BulkRequest request) {
            }

            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    for (BulkItemResponse itemResp : response.getItems()) {
                        if (!itemResp.isFailed()) continue;
                        LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
                        ElasticsearchSink.this.failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
                    }
                    ElasticsearchSink.this.hasFailure.set(true);
                }
            }

            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                LOG.error(failure.getMessage());
                ElasticsearchSink.this.failureThrowable.compareAndSet(null, failure);
                ElasticsearchSink.this.hasFailure.set(true);
            }
        });
        builder.setConcurrentRequests(0);
        ParameterTool params = ParameterTool.fromMap(this.userConfig);
        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
            builder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
        }
        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
            builder.setBulkSize(new ByteSizeValue((long)params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
        }
        if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
            builder.setFlushInterval(TimeValue.timeValueMillis((long)params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
        }
        this.bulkProcessor = builder.build();
        this.requestIndexer = new BulkProcessorIndexer(this.bulkProcessor);
    }

    public void invoke(T element) {
        this.elasticsearchSinkFunction.process(element, this.getRuntimeContext(), this.requestIndexer);
    }

    public void close() {
        if (this.bulkProcessor != null) {
            this.bulkProcessor.close();
            this.bulkProcessor = null;
        }
        if (this.client != null) {
            this.client.close();
        }
        if (this.hasFailure.get()) {
            Throwable cause = this.failureThrowable.get();
            if (cause != null) {
                throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
            }
            throw new RuntimeException("An error occured in ElasticsearchSink.");
        }
    }
}

