package org.apache.inlong.sort.standalone.sink.elasticsearch;

import java.io.IOException;
import java.util.Arrays;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.class */
public class EsOutputChannel extends Thread {
    public static final Logger LOG = InlongLoggerFactory.getLogger(EsOutputChannel.class);
    private LifecycleState status;
    private EsSinkContext context;
    private RestHighLevelClient esClient;
    private BulkProcessor bulkProcessor;

    public EsOutputChannel(EsSinkContext esSinkContext) {
        super(esSinkContext.getTaskName());
        this.context = esSinkContext;
        this.status = LifecycleState.IDLE;
    }

    public void init() {
        if (initEsclient()) {
            initBulkprocessIfNeed();
        }
    }

    public BulkProcessor getBulkProcessor() {
        if (this.bulkProcessor == null && initEsclient()) {
            initBulkprocessIfNeed();
        }
        return this.bulkProcessor;
    }

    private boolean initEsclient() {
        try {
            if (this.esClient == null || !this.esClient.ping(RequestOptions.DEFAULT)) {
                String username = this.context.getUsername();
                String password = this.context.getPassword();
                HttpHost[] httpHosts = this.context.getHttpHosts();
                LOG.info("initEsclient:url:{},user:{},password:{}", new Object[]{Arrays.asList(httpHosts), username, password});
                RestClientBuilder builder = RestClient.builder(httpHosts);
                BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
                basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                    httpAsyncClientBuilder.disableAuthCaching();
                    return httpAsyncClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider).setMaxConnTotal(this.context.getMaxConnect()).setMaxConnPerRoute(this.context.getMaxConnectPerRoute()).setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(this.context.getConnectionRequestTimeout()).setMaxRedirects(this.context.getMaxRedirects()).setSocketTimeout(this.context.getSocketTimeout()).setConnectTimeout(120000).build());
                });
                this.esClient = EsSinkFactory.createRestHighLevelClient(builder);
            }
            return true;
        } catch (Exception e) {
            LOG.error("init esclient failed.", e);
            this.esClient = null;
            return false;
        }
    }

    private boolean initBulkprocessIfNeed() {
        try {
            if (this.bulkProcessor == null) {
                this.bulkProcessor = BulkProcessor.builder((bulkRequest, actionListener) -> {
                    this.esClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
                }, new EsCallbackListener(this.context)).setBulkActions(this.context.getBulkAction()).setBulkSize(new ByteSizeValue(this.context.getBulkSizeMb(), ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(this.context.getFlushInterval())).setConcurrentRequests(this.context.getConcurrentRequests()).build();
            }
            return true;
        } catch (Exception e) {
            LOG.error("init esclient failed", e);
            this.esClient = null;
            return false;
        }
    }

    public void close() {
        this.status = LifecycleState.STOP;
        try {
            this.bulkProcessor.close();
        } catch (Exception e) {
            LOG.error(String.format("close bulkProcessor:%s", e.getMessage()), e);
        }
        try {
            this.esClient.close();
        } catch (IOException e2) {
            LOG.error(String.format("close EsClient:%s", e2.getMessage()), e2);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.status = LifecycleState.START;
        LOG.info("start to EsOutputChannel:{},status:{}", this.context.getTaskName(), this.status);
        while (this.status == LifecycleState.START) {
            try {
                send();
            } catch (Throwable th) {
                LOG.error(th.getMessage(), th);
            }
        }
    }

    public void send() throws InterruptedException {
        EsIndexRequest esIndexRequest = null;
        try {
            BulkProcessor bulkProcessor = getBulkProcessor();
            if (bulkProcessor == null) {
                Thread.sleep(this.context.getProcessInterval());
                return;
            }
            EsIndexRequest takeDispatchQueue = this.context.takeDispatchQueue();
            if (takeDispatchQueue == null) {
                Thread.sleep(this.context.getProcessInterval());
                return;
            }
            if (this.context.getIdConfig(takeDispatchQueue.getEvent().getUid()) == null) {
                this.context.addSendResultMetric(takeDispatchQueue.getEvent(), this.context.getTaskName(), false, takeDispatchQueue.getSendTime());
            } else {
                bulkProcessor.add(takeDispatchQueue);
                this.context.addSendMetric(takeDispatchQueue.getEvent(), this.context.getTaskName());
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            if (0 != 0) {
                this.context.backDispatchQueue(null);
                this.context.addSendResultMetric(esIndexRequest.getEvent(), this.context.getTaskName(), false, esIndexRequest.getSendTime());
            }
            try {
                Thread.sleep(this.context.getProcessInterval());
            } catch (InterruptedException e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }
}
