package org.apache.inlong.sort.standalone.sink.http;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.class */
public class HttpChannelWorker extends Thread {
    public static final Logger LOG = LoggerFactory.getLogger(HttpChannelWorker.class);
    private final HttpSinkContext context;
    private final int workerIndex;
    private LifecycleState status = LifecycleState.IDLE;
    private IEvent2HttpRequestHandler handler;
    private CloseableHttpAsyncClient httpClient;

    public HttpChannelWorker(HttpSinkContext httpSinkContext, int i) {
        this.context = httpSinkContext;
        this.workerIndex = i;
        this.handler = httpSinkContext.createHttpRequestHandler();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.status = LifecycleState.START;
        LOG.info("Starting HttpChannelWorker:{},status:{},index:{}", new Object[]{this.context.getTaskName(), this.status, Integer.valueOf(this.workerIndex)});
        initHttpClient();
        while (this.status == LifecycleState.START) {
            try {
                doRun();
            } catch (Throwable th) {
                LOG.error("Error occurred while starting HttpChannelWorker:{},status:{},index:{}", new Object[]{this.context.getTaskName(), this.status, Integer.valueOf(this.workerIndex), th});
            }
        }
    }

    public void doRun() throws InterruptedException, JsonProcessingException, URISyntaxException {
        DispatchProfile takeDispatchQueue = this.context.takeDispatchQueue();
        if (takeDispatchQueue == null) {
            Thread.sleep(this.context.getProcessInterval());
            return;
        }
        if (this.context.getIdConfig(takeDispatchQueue.getUid()) == null) {
            for (ProfileEvent profileEvent : takeDispatchQueue.getEvents()) {
                this.context.addSendResultMetric(profileEvent, this.context.getTaskName(), false, System.currentTimeMillis());
                profileEvent.ack();
            }
            return;
        }
        try {
            List<HttpRequest> parse = this.handler.parse(this.context, takeDispatchQueue);
            if (parse == null) {
                for (ProfileEvent profileEvent2 : takeDispatchQueue.getEvents()) {
                    this.context.addSendResultMetric(profileEvent2, this.context.getTaskName(), false, System.currentTimeMillis());
                    this.context.releaseDispatchQueue(takeDispatchQueue);
                    profileEvent2.ack();
                }
            }
            for (HttpRequest httpRequest : parse) {
                this.httpClient.execute(httpRequest.getRequest(), new HttpCallback(this.context, httpRequest));
                Iterator<ProfileEvent> it = takeDispatchQueue.getEvents().iterator();
                while (it.hasNext()) {
                    this.context.addSendMetric(it.next(), this.context.getTaskName());
                }
            }
        } catch (Throwable th) {
            LOG.error("Failed to send HttpRequest uid:{},error:{}", new Object[]{takeDispatchQueue.getUid(), th.getMessage(), th});
            this.context.backDispatchQueue(takeDispatchQueue);
            initHttpClient();
            Thread.sleep(this.context.getProcessInterval());
        }
    }

    public void close() {
        this.status = LifecycleState.STOP;
    }

    private void initHttpClient() {
        if (this.httpClient != null) {
            try {
                this.httpClient.close();
            } catch (IOException e) {
                LOG.error(String.format("close HttpClient:%s", e.getMessage()), e);
            }
            this.httpClient = null;
        }
        try {
            if (this.httpClient == null) {
                String username = this.context.getUsername();
                String password = this.context.getPassword();
                LOG.info("initHttpAsyncClient:url:{}", this.context.getBaseUrl());
                HttpAsyncClientBuilder custom = HttpAsyncClients.custom();
                BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
                if (this.context.getEnableCredential()) {
                    basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                    custom.setDefaultCredentialsProvider(basicCredentialsProvider);
                }
                custom.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(this.context.getConnectionRequestTimeout()).setSocketTimeout(this.context.getSocketTimeout()).setMaxRedirects(this.context.getMaxRedirects()).setConnectTimeout(120000).build()).setMaxConnTotal(this.context.getMaxConnect()).setMaxConnPerRoute(this.context.getMaxConnectPerRoute());
                this.httpClient = HttpSinkFactory.createHttpAsyncClient(custom);
                this.httpClient.start();
            }
        } catch (Exception e2) {
            LOG.error("init httpclient failed,error:{}", e2.getMessage(), e2);
            this.httpClient = null;
        }
    }
}
