package org.apache.inlong.sdk.dataproxy.http;

import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.util.BasicAuth;
import org.apache.inlong.dataproxy.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.class */
public class InternalHttpSender {
    private static final Logger logger = LoggerFactory.getLogger(InternalHttpSender.class);
    private final ProxyClientConfig proxyClientConfig;
    private final ConcurrentHashSet<HostInfo> hostList;
    private final LinkedBlockingQueue<HttpMessage> messageCache;
    private CloseableHttpClient httpClient;
    private final ExecutorService workerServices = Executors.newCachedThreadPool();
    private boolean bShutDown = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/sdk/dataproxy/http/InternalHttpSender$WorkerRunner.class */
    public class WorkerRunner implements Runnable {
        private WorkerRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (InternalHttpSender.this.bShutDown && InternalHttpSender.this.messageCache.isEmpty()) {
                    return;
                }
                while (!InternalHttpSender.this.messageCache.isEmpty()) {
                    try {
                        HttpMessage httpMessage = (HttpMessage) InternalHttpSender.this.messageCache.poll();
                        if (httpMessage != null) {
                            httpMessage.getCallback().onMessageAck(InternalHttpSender.this.sendMessageWithHostInfo(httpMessage.getBodies(), httpMessage.getGroupId(), httpMessage.getStreamId(), httpMessage.getDt(), httpMessage.getTimeout(), httpMessage.getTimeUnit()));
                        }
                    } catch (Exception e) {
                        InternalHttpSender.logger.error("exception caught", e);
                    }
                }
                TimeUnit.MILLISECONDS.sleep(InternalHttpSender.this.proxyClientConfig.getAsyncWorkerInterval());
            }
        }
    }

    public InternalHttpSender(ProxyClientConfig proxyClientConfig, ConcurrentHashSet<HostInfo> concurrentHashSet, LinkedBlockingQueue<HttpMessage> linkedBlockingQueue) {
        this.proxyClientConfig = proxyClientConfig;
        this.hostList = concurrentHashSet;
        this.messageCache = linkedBlockingQueue;
        submitWorkThread();
    }

    private void submitWorkThread() {
        for (int i = 0; i < this.proxyClientConfig.getAsyncWorkerNumber(); i++) {
            this.workerServices.execute(new WorkerRunner());
        }
    }

    private ArrayList<BasicNameValuePair> getHeaders(List<String> list, String str, String str2, long j) {
        ArrayList<BasicNameValuePair> arrayList = new ArrayList<>();
        arrayList.add(new BasicNameValuePair("groupId", str));
        arrayList.add(new BasicNameValuePair("streamId", str2));
        arrayList.add(new BasicNameValuePair("dt", String.valueOf(j)));
        arrayList.add(new BasicNameValuePair("body", StringUtils.join(list, StringUtils.LF)));
        arrayList.add(new BasicNameValuePair("cnt", String.valueOf(list.size())));
        return arrayList;
    }

    private synchronized CloseableHttpClient constructHttpClient(long j, TimeUnit timeUnit) {
        if (this.httpClient != null) {
            return this.httpClient;
        }
        long millis = timeUnit.toMillis(j);
        RequestConfig build = RequestConfig.custom().setConnectTimeout((int) millis).setSocketTimeout((int) millis).build();
        HttpClientBuilder create = HttpClientBuilder.create();
        create.setDefaultRequestConfig(build);
        return create.build();
    }

    public List<HostInfo> getRandomHostInfo() {
        ArrayList arrayList = new ArrayList(this.hostList);
        Collections.shuffle(arrayList);
        return arrayList.subList(0, Math.min(this.proxyClientConfig.getAliveConnections(), arrayList.size()));
    }

    private SendResult sendByHttp(List<String> list, String str, String str2, long j, long j2, TimeUnit timeUnit, HostInfo hostInfo) throws Exception {
        HttpPost httpPost = null;
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            if (this.httpClient == null) {
                this.httpClient = constructHttpClient(j2, timeUnit);
            }
            String str3 = "http://" + hostInfo.getHostName() + BasicAuth.BASIC_AUTH_JOINER + hostInfo.getPortNumber() + "/dataproxy/message";
            HttpPost httpPost2 = new HttpPost(str3);
            httpPost2.setHeader("Connection", "close");
            httpPost2.setHeader("Content-Type", "application/x-www-form-urlencoded");
            String format = URLEncodedUtils.format(getHeaders(list, str, str2, j), StandardCharsets.UTF_8);
            httpPost2.setEntity(new StringEntity(format));
            logger.info("begin to post request to {}, encoded content is: {}", str3, format);
            CloseableHttpResponse execute = this.httpClient.execute(httpPost2);
            String entityUtils = EntityUtils.toString(execute.getEntity());
            int statusCode = execute.getStatusLine().getStatusCode();
            if (Utils.isBlank(entityUtils) || 200 != statusCode) {
                throw new Exception("get config from manager failed, result: " + entityUtils + ", code: " + statusCode);
            }
            logger.debug("success to get config from manager, result str: " + entityUtils);
            JsonElement jsonElement = JsonParser.parseString(entityUtils).getAsJsonObject().get("code");
            if (jsonElement == null) {
                if (httpPost2 != null) {
                    httpPost2.releaseConnection();
                }
                if (execute != null) {
                    execute.close();
                }
                return SendResult.UNKOWN_ERROR;
            }
            if (DataProxyErrCode.SUCCESS.getErrCode() == jsonElement.getAsInt()) {
                SendResult sendResult = SendResult.OK;
                if (httpPost2 != null) {
                    httpPost2.releaseConnection();
                }
                if (execute != null) {
                    execute.close();
                }
                return sendResult;
            }
            SendResult sendResult2 = SendResult.INVALID_DATA;
            if (httpPost2 != null) {
                httpPost2.releaseConnection();
            }
            if (execute != null) {
                execute.close();
            }
            return sendResult2;
        } catch (Throwable th) {
            if (0 != 0) {
                httpPost.releaseConnection();
            }
            if (0 != 0) {
                closeableHttpResponse.close();
            }
            throw th;
        }
    }

    public SendResult sendMessageWithHostInfo(List<String> list, String str, String str2, long j, long j2, TimeUnit timeUnit) {
        Exception exc = null;
        Iterator<HostInfo> it = getRandomHostInfo().iterator();
        while (it.hasNext()) {
            try {
                return sendByHttp(list, str, str2, j, j2, timeUnit, it.next());
            } catch (Exception e) {
                exc = e;
                logger.debug("error while sending data, resending it", e);
            }
        }
        if (exc != null) {
            logger.error("error while sending data", exc);
        }
        return SendResult.UNKOWN_ERROR;
    }

    public void close() throws Exception {
        this.bShutDown = true;
        if (this.proxyClientConfig.isCleanHttpCacheWhenClosing()) {
            this.messageCache.clear();
        }
        if (this.httpClient != null) {
            this.httpClient.close();
        }
        this.workerServices.shutdown();
    }
}
