/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.flink.elasticsearch6.sink;

import java.net.InetAddress;
import java.util.List;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchOutputFormat<T>
extends RichOutputFormat<T> {
    private static final long serialVersionUID = 2048590860723433896L;
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
    private final Config config;
    private static final String PREFIX = "es.";
    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
    private transient RequestIndexer requestIndexer;
    private transient BulkProcessor bulkProcessor;

    public ElasticsearchOutputFormat(Config userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
        this.config = userConfig;
        this.elasticsearchSinkFunction = elasticsearchSinkFunction;
    }

    public void configure(Configuration configuration) {
        List hosts = this.config.getStringList("hosts");
        Settings.Builder settings = Settings.builder();
        this.config.entrySet().forEach(entry -> {
            String key = (String)entry.getKey();
            Object value = ((ConfigValue)entry.getValue()).unwrapped();
            if (key.startsWith(PREFIX)) {
                settings.put(key.substring(PREFIX.length()), value.toString());
            }
        });
        PreBuiltTransportClient transportClient = new PreBuiltTransportClient(settings.build(), new Class[0]);
        for (String host : hosts) {
            try {
                transportClient.addTransportAddresses(new TransportAddress(InetAddress.getByName(host.split(":")[0]), Integer.parseInt(host.split(":")[1])));
            }
            catch (Exception e) {
                LOGGER.warn("Host '{}' parse failed.", (Object)host, (Object)e);
            }
        }
        BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(transportClient, new BulkProcessor.Listener(){

            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            }
        });
        this.bulkProcessor = bulkProcessorBuilder.build();
        this.requestIndexer = new RequestIndexer(){

            @Override
            public void add(DeleteRequest ... deleteRequests) {
            }

            @Override
            public void add(IndexRequest ... indexRequests) {
                for (IndexRequest indexRequest : indexRequests) {
                    ElasticsearchOutputFormat.this.bulkProcessor.add(indexRequest);
                }
            }

            @Override
            public void add(UpdateRequest ... updateRequests) {
            }
        };
    }

    public void open(int i, int i1) {
    }

    public void writeRecord(T t) {
        this.elasticsearchSinkFunction.process(t, this.getRuntimeContext(), this.requestIndexer);
    }

    public void close() {
        this.bulkProcessor.flush();
    }
}

