package org.apache.inlong.dataproxy.network;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.dataproxy.ProxyClientConfig;
import org.apache.inlong.dataproxy.SendMessageCallback;
import org.apache.inlong.dataproxy.SendResult;
import org.apache.inlong.dataproxy.config.HostInfo;
import org.apache.inlong.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.dataproxy.http.InternalHttpSender;
import org.apache.inlong.dataproxy.utils.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/network/HttpProxySender.class */
public class HttpProxySender extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(Sender.class);
    private final ProxyClientConfig proxyClientConfig;
    private ProxyConfigManager proxyConfigManager;
    private final InternalHttpSender internalHttpSender;
    private final LinkedBlockingQueue<HttpMessage> messageCache;
    private final ConcurrentHashSet<HostInfo> hostList = new ConcurrentHashSet<>();
    private boolean bShutDown = false;

    public HttpProxySender(ProxyClientConfig proxyClientConfig) throws Exception {
        this.proxyClientConfig = proxyClientConfig;
        initTDMClientAndRequest(proxyClientConfig);
        this.messageCache = new LinkedBlockingQueue<>(proxyClientConfig.getTotalAsyncCallbackSize());
        this.internalHttpSender = new InternalHttpSender(proxyClientConfig, this.hostList, this.messageCache);
    }

    private void initTDMClientAndRequest(ProxyClientConfig proxyClientConfig) throws Exception {
        try {
            this.proxyConfigManager = new ProxyConfigManager(proxyClientConfig, Utils.getLocalIp(), null);
            this.proxyConfigManager.setBusinessId(proxyClientConfig.getBid());
            this.hostList.addAll(retryGettingProxyConfig().getHostMap().values());
            setDaemon(true);
            start();
            logger.info("http proxy sender starts");
        } catch (Throwable th) {
            if (!proxyClientConfig.isReadProxyIPFromLocal()) {
                throw new Exception("Visit TDManager error!", th.getCause());
            }
            throw new Exception("Get local proxy configure failure!", th.getCause());
        }
    }

    private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
        return this.proxyConfigManager.getBidConfigure();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.bShutDown) {
            try {
                TimeUnit.MILLISECONDS.sleep(((this.proxyClientConfig.getProxyHttpUpdateIntervalMinutes() * 60) + ThreadLocalRandom.current().nextInt(0, 600)) * 1000);
                if (this.proxyConfigManager != null) {
                    ProxyConfigEntry bidConfigure = this.proxyConfigManager.getBidConfigure();
                    this.hostList.addAll(bidConfigure.getHostMap().values());
                    this.hostList.retainAll(bidConfigure.getHostMap().values());
                } else {
                    logger.error("manager is null, please check it!");
                }
                logger.info("get new proxy list " + this.hostList.toString());
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                logger.error("managerFetcher get or save managerIpList occur error,", e2);
            }
        }
    }

    public SendResult sendMessage(String str, String str2, String str3, long j, long j2, TimeUnit timeUnit) {
        return sendMessage(Collections.singletonList(str), str2, str3, j, j2, timeUnit);
    }

    public SendResult sendMessage(List<String> list, String str, String str2, long j, long j2, TimeUnit timeUnit) {
        if (!this.hostList.isEmpty()) {
            return this.internalHttpSender.sendMessageWithHostInfo(list, str, str2, j, j2, timeUnit);
        }
        logger.error("proxy list is empty, maybe client has been closed or bid is not assigned with proxy list");
        return SendResult.NO_CONNECTION;
    }

    public void asyncSendMessage(List<String> list, String str, String str2, long j, long j2, TimeUnit timeUnit, SendMessageCallback sendMessageCallback) {
        HttpMessage httpMessage = new HttpMessage(new ArrayList(list), str, str2, j, j2, timeUnit, sendMessageCallback);
        try {
            if (!this.messageCache.offer(httpMessage)) {
                if (this.proxyClientConfig.isDiscardOldMessage()) {
                    logger.debug("discard old message and use new message instead");
                    HttpMessage poll = this.messageCache.poll();
                    if (poll != null) {
                        poll.getCallback().onMessageAck(SendResult.TIMEOUT);
                    }
                    this.messageCache.offer(httpMessage);
                } else {
                    this.messageCache.put(httpMessage);
                }
            }
        } catch (Exception e) {
            logger.error("error async sending data", e);
        }
    }

    public void asyncSendMessage(String str, String str2, String str3, long j, long j2, TimeUnit timeUnit, SendMessageCallback sendMessageCallback) {
        asyncSendMessage(Collections.singletonList(str), str2, str3, j, j2, timeUnit, sendMessageCallback);
    }

    public void close() {
        this.hostList.clear();
        this.bShutDown = true;
        try {
            interrupt();
            this.internalHttpSender.close();
        } catch (Exception e) {
            logger.error("error while closing http client", e);
        }
    }
}
