package org.apache.inlong.sdk.dataproxy.network;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.inlong.sdk.dataproxy.FileCallback;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/network/Sender.class */
public class Sender {
    private static final Logger logger = LoggerFactory.getLogger(Sender.class);
    private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> callbacks;
    private final ConcurrentHashMap<String, SyncMessageCallable> syncCallables;
    private final ConcurrentHashMap<String, NettyClient> chooseProxy;
    private final ReentrantLock stateLock;
    private final ExecutorService threadPool;
    private final int asyncCallbackMaxSize;
    private final AtomicInteger currentBufferSize;
    private final TimeoutScanThread scanThread;
    private final ClientMgr clientMgr;
    private final ProxyClientConfig configure;
    private final boolean isFile;
    private final MetricWorkerThread metricWorker;
    private String clusterId;

    public Sender(ProxyClientConfig proxyClientConfig) throws Exception {
        this(proxyClientConfig, null);
    }

    public Sender(ProxyClientConfig proxyClientConfig, ChannelFactory channelFactory) throws Exception {
        this.callbacks = new ConcurrentHashMap<>();
        this.syncCallables = new ConcurrentHashMap<>();
        this.chooseProxy = new ConcurrentHashMap<>();
        this.stateLock = new ReentrantLock();
        this.currentBufferSize = new AtomicInteger(0);
        this.configure = proxyClientConfig;
        this.asyncCallbackMaxSize = proxyClientConfig.getTotalAsyncCallbackSize();
        this.threadPool = Executors.newCachedThreadPool();
        this.clientMgr = new ClientMgr(proxyClientConfig, this, channelFactory);
        try {
            ProxyConfigEntry groupIdConfigureInfo = this.clientMgr.getGroupIdConfigureInfo();
            setClusterId(groupIdConfigureInfo.getClusterId());
            if (!groupIdConfigureInfo.isInterVisit()) {
                if (!proxyClientConfig.isNeedAuthentication()) {
                    throw new Exception("In OutNetwork isNeedAuthentication must be true!");
                }
                if (!proxyClientConfig.isNeedDataEncry()) {
                    throw new Exception("In OutNetwork isNeedDataEncry must be true!");
                }
            }
            this.isFile = proxyClientConfig.isFile();
            this.scanThread = new TimeoutScanThread(this.callbacks, this.currentBufferSize, proxyClientConfig, this.clientMgr);
            this.scanThread.start();
            this.metricWorker = new MetricWorkerThread(proxyClientConfig, this);
            this.metricWorker.start();
            logger.info("proxy sdk is starting!");
        } catch (Throwable th) {
            if (!proxyClientConfig.isReadProxyIPFromLocal()) {
                throw new Exception("Visit manager error!", th.getCause());
            }
            throw new Exception("Get local proxy configure failure!", th.getCause());
        }
    }

    private void checkCallbackList() {
        logger.info("checking call back list before close, current size is {}", Integer.valueOf(this.currentBufferSize.get()));
        for (int i = 0; this.currentBufferSize.get() > 0 && i < 60; i++) {
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (Exception e) {
                logger.error("exception while checking callback list", e);
                return;
            }
        }
        if (this.currentBufferSize.get() > 0) {
            logger.warn("callback not empty {}, please check it", Integer.valueOf(this.currentBufferSize.get()));
        }
    }

    public void close() {
        checkCallbackList();
        this.scanThread.shutDown();
        this.clientMgr.shutDown();
        this.threadPool.shutdown();
        this.metricWorker.close();
    }

    public String getExceptionStack(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter);
        String str = null;
        try {
            try {
                th.printStackTrace(printWriter);
                str = stringWriter.toString();
                try {
                    printWriter.close();
                    stringWriter.close();
                } catch (Exception e) {
                    logger.error(getExceptionStack(e));
                }
            } catch (Exception e2) {
                logger.error(getExceptionStack(e2));
                try {
                    printWriter.close();
                    stringWriter.close();
                } catch (Exception e3) {
                    logger.error(getExceptionStack(e3));
                }
            }
            return str;
        } catch (Throwable th2) {
            try {
                printWriter.close();
                stringWriter.close();
            } catch (Exception e4) {
                logger.error(getExceptionStack(e4));
            }
            throw th2;
        }
    }

    public void notifyCallback(Channel channel, String str, SendResult sendResult) {
        ConcurrentHashMap<String, QueueObject> concurrentHashMap;
        QueueObject remove;
        if (channel == null || (concurrentHashMap = this.callbacks.get(channel)) == null || (remove = concurrentHashMap.remove(str)) == null) {
            return;
        }
        if (!this.isFile) {
            remove.getCallback().onMessageAck(sendResult);
            this.currentBufferSize.decrementAndGet();
        } else {
            String obj = channel.getRemoteAddress().toString();
            ((FileCallback) remove.getCallback()).onMessageAck(sendResult.toString() + "=" + obj.substring(1, obj.indexOf(58)));
            this.currentBufferSize.addAndGet(-remove.getSize());
        }
    }

    private SendResult syncSendInternalMessage(NettyClient nettyClient, EncodeObject encodeObject, String str, long j, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
        NettyClient clientByRoundRobin = this.clientMgr.getClientByRoundRobin();
        if (clientByRoundRobin == null) {
            return SendResult.NO_CONNECTION;
        }
        if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
            logger.error("error attr format {} {}", encodeObject.getCommonattr(), encodeObject.getAttributes());
            return SendResult.INVALID_ATTRIBUTES;
        }
        if (encodeObject.getMsgtype() == 7) {
            int i = 0;
            int i2 = 0;
            if (encodeObject.getGroupId().equals(this.clientMgr.getGroupId())) {
                i = this.clientMgr.getGroupIdNum();
                i2 = this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null ? this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()).intValue() : 0;
            }
            encodeObject.setGroupIdNum(i);
            encodeObject.setStreamIdNum(i2);
            if (i == 0 || i2 == 0) {
                encodeObject.setGroupIdTransfer(false);
            }
        }
        if (this.configure.isNeedDataEncry()) {
            encodeObject.setEncryptEntry(true, this.configure.getUserName(), this.clientMgr.getEncryptConfigEntry());
        } else {
            encodeObject.setEncryptEntry(false, null, null);
        }
        encodeObject.setMsgUUID(str);
        SyncMessageCallable syncMessageCallable = new SyncMessageCallable(clientByRoundRobin, encodeObject, j, timeUnit);
        this.syncCallables.put(encodeObject.getMessageId(), syncMessageCallable);
        return (SendResult) this.threadPool.submit(syncMessageCallable).get(j, timeUnit);
    }

    public SendResult syncSendMessage(EncodeObject encodeObject, String str, long j, TimeUnit timeUnit) {
        NettyClient client;
        Channel channel;
        this.metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt());
        NettyClient nettyClient = null;
        try {
            SendResult syncSendInternalMessage = syncSendInternalMessage(null, encodeObject, str, j, timeUnit);
            if (syncSendInternalMessage == null) {
                this.syncCallables.remove(encodeObject.getMessageId());
                return SendResult.UNKOWN_ERROR;
            }
            this.scanThread.resetTimeoutChannel(nettyClient.getChannel());
            if (syncSendInternalMessage == SendResult.OK) {
                this.metricWorker.recordSuccessByMessageId(encodeObject.getMessageId());
            }
            return syncSendInternalMessage;
        } catch (InterruptedException e) {
            logger.error("send message error {} ", getExceptionStack(e));
            this.syncCallables.remove(encodeObject.getMessageId());
            return SendResult.THREAD_INTERRUPT;
        } catch (ExecutionException e2) {
            logger.error("ExecutionException {} ", getExceptionStack(e2));
            this.syncCallables.remove(encodeObject.getMessageId());
            return SendResult.UNKOWN_ERROR;
        } catch (TimeoutException e3) {
            logger.error("TimeoutException {} ", getExceptionStack(e3));
            SyncMessageCallable remove = this.syncCallables.remove(encodeObject.getMessageId());
            if (remove != null && (client = remove.getClient()) != null && (channel = client.getChannel()) != null) {
                logger.error("channel maybe busy {}", channel);
                this.scanThread.addTimeoutChannel(channel);
            }
            return SendResult.TIMEOUT;
        } catch (Throwable th) {
            logger.error("syncSendMessage exception {} ", getExceptionStack(th));
            this.syncCallables.remove(encodeObject.getMessageId());
            return SendResult.UNKOWN_ERROR;
        }
    }

    private SendResult syncSendMessageIndexInternal(NettyClient nettyClient, EncodeObject encodeObject, String str, long j, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
        if (nettyClient == null || !nettyClient.isActive()) {
            this.chooseProxy.remove(encodeObject.getMessageId());
            nettyClient = this.clientMgr.getClientByRoundRobin();
            if (nettyClient == null) {
                return SendResult.NO_CONNECTION;
            }
            this.chooseProxy.put(encodeObject.getMessageId(), nettyClient);
        }
        if (encodeObject.getMsgtype() == 7) {
            int i = 0;
            int i2 = 0;
            if (encodeObject.getGroupId().equals(this.clientMgr.getGroupId())) {
                i = this.clientMgr.getGroupIdNum();
                i2 = this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null ? this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()).intValue() : 0;
            }
            encodeObject.setGroupIdNum(i);
            encodeObject.setStreamIdNum(i2);
            if (i == 0 || i2 == 0) {
                encodeObject.setGroupIdTransfer(false);
            }
        }
        if (this.configure.isNeedDataEncry()) {
            encodeObject.setEncryptEntry(true, this.configure.getUserName(), this.clientMgr.getEncryptConfigEntry());
        } else {
            encodeObject.setEncryptEntry(false, null, null);
        }
        encodeObject.setMsgUUID(str);
        SyncMessageCallable syncMessageCallable = new SyncMessageCallable(nettyClient, encodeObject, j, timeUnit);
        this.syncCallables.put(encodeObject.getMessageId(), syncMessageCallable);
        return (SendResult) this.threadPool.submit(syncMessageCallable).get(j, timeUnit);
    }

    public String syncSendMessageIndex(EncodeObject encodeObject, String str, long j, TimeUnit timeUnit) {
        NettyClient client;
        Channel channel;
        try {
            NettyClient nettyClient = this.chooseProxy.get(encodeObject.getMessageId());
            String proxyIp = encodeObject.getProxyIp();
            if (proxyIp != null && proxyIp.length() != 0) {
                nettyClient = this.clientMgr.getContainProxy(proxyIp);
            }
            if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
                logger.error("error attr format {} {}", encodeObject.getCommonattr(), encodeObject.getAttributes());
                return SendResult.INVALID_ATTRIBUTES.toString();
            }
            try {
                try {
                    try {
                        try {
                            SendResult syncSendMessageIndexInternal = syncSendMessageIndexInternal(nettyClient, encodeObject, str, j, timeUnit);
                            this.scanThread.resetTimeoutChannel(nettyClient.getChannel());
                            return syncSendMessageIndexInternal.toString() + "=" + nettyClient.getServerIP();
                        } catch (Throwable th) {
                            logger.error("syncSendMessage exception {}", getExceptionStack(th));
                            this.syncCallables.remove(encodeObject.getMessageId());
                            return SendResult.UNKOWN_ERROR.toString();
                        }
                    } catch (TimeoutException e) {
                        logger.error("TimeoutException {}", getExceptionStack(e));
                        SyncMessageCallable remove = this.syncCallables.remove(encodeObject.getMessageId());
                        if (remove != null && (client = remove.getClient()) != null && (channel = client.getChannel()) != null) {
                            logger.error("channel maybe busy {}", channel);
                            this.scanThread.addTimeoutChannel(channel);
                        }
                        return SendResult.TIMEOUT.toString();
                    }
                } catch (ExecutionException e2) {
                    logger.error("ExecutionException {}", getExceptionStack(e2));
                    this.syncCallables.remove(encodeObject.getMessageId());
                    return SendResult.UNKOWN_ERROR.toString();
                }
            } catch (InterruptedException e3) {
                logger.error("send message error {}", getExceptionStack(e3));
                this.syncCallables.remove(encodeObject.getMessageId());
                return SendResult.THREAD_INTERRUPT.toString();
            }
        } catch (Exception e4) {
            logger.error("agent send error {}", getExceptionStack(e4));
            this.syncCallables.remove(encodeObject.getMessageId());
            return SendResult.UNKOWN_ERROR.toString();
        }
    }

    public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback fileCallback, String str, long j, TimeUnit timeUnit) throws ProxysdkException {
        NettyClient nettyClient = this.chooseProxy.get(encodeObject.getMessageId());
        String proxyIp = encodeObject.getProxyIp();
        if (proxyIp != null && proxyIp.length() != 0) {
            nettyClient = this.clientMgr.getContainProxy(proxyIp);
        }
        if (nettyClient == null || !nettyClient.isActive()) {
            this.chooseProxy.remove(encodeObject.getMessageId());
            nettyClient = this.clientMgr.getClientByRoundRobin();
            if (nettyClient == null) {
                throw new ProxysdkException(SendResult.NO_CONNECTION.toString());
            }
            this.chooseProxy.put(encodeObject.getMessageId(), nettyClient);
        }
        if (this.currentBufferSize.get() >= this.asyncCallbackMaxSize) {
            throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
        }
        int i = 1;
        if (this.isFile) {
            if (encodeObject.getBodyBytes() != null) {
                i = encodeObject.getBodyBytes().length;
            } else {
                Iterator<byte[]> it = encodeObject.getBodylist().iterator();
                while (it.hasNext()) {
                    i += it.next().length;
                }
            }
            if (this.currentBufferSize.addAndGet(i) >= this.asyncCallbackMaxSize) {
                this.currentBufferSize.addAndGet(-i);
                throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
            }
        } else if (this.currentBufferSize.incrementAndGet() >= this.asyncCallbackMaxSize) {
            this.currentBufferSize.decrementAndGet();
            throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
        }
        ConcurrentHashMap<String, QueueObject> concurrentHashMap = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, QueueObject> putIfAbsent = this.callbacks.putIfAbsent(nettyClient.getChannel(), concurrentHashMap);
        if (putIfAbsent == null) {
            putIfAbsent = concurrentHashMap;
        }
        putIfAbsent.put(encodeObject.getMessageId(), new QueueObject(System.currentTimeMillis(), fileCallback, i, j, timeUnit));
        if (encodeObject.getMsgtype() == 7) {
            int i2 = 0;
            int i3 = 0;
            if (this.clientMgr.getGroupId().length() != 0 && encodeObject.getGroupId().equals(this.clientMgr.getGroupId())) {
                i2 = this.clientMgr.getGroupIdNum();
                i3 = this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null ? this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()).intValue() : 0;
            }
            encodeObject.setGroupIdNum(i2);
            encodeObject.setStreamIdNum(i3);
            if (i2 == 0 || i3 == 0) {
                encodeObject.setGroupIdTransfer(false);
            }
        }
        if (this.configure.isNeedDataEncry()) {
            encodeObject.setEncryptEntry(true, this.configure.getUserName(), this.clientMgr.getEncryptConfigEntry());
        } else {
            encodeObject.setEncryptEntry(false, null, null);
        }
        encodeObject.setMsgUUID(str);
        nettyClient.write(encodeObject);
    }

    private boolean isNotValidateAttr(String str, String str2) {
        if (StringUtils.isEmpty(str) || validAttribute(str)) {
            return (StringUtils.isEmpty(str2) || validAttribute(str2)) ? false : true;
        }
        return true;
    }

    private boolean validAttribute(String str) {
        boolean z = true;
        boolean z2 = false;
        for (int i = 0; i < str.length(); i++) {
            char charAt = str.charAt(i);
            if (charAt == '=') {
                if (!z) {
                    return false;
                }
                z = false;
                z2 = true;
            } else if (charAt != '&') {
                continue;
            } else {
                if (!z2) {
                    return false;
                }
                z2 = false;
                z = true;
            }
        }
        return !z;
    }

    public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback sendMessageCallback, String str, long j, TimeUnit timeUnit) throws ProxysdkException {
        this.metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(), encodeObject.getDt(), encodeObject.getRealCnt());
        NettyClient clientByRoundRobin = this.clientMgr.getClientByRoundRobin();
        if (clientByRoundRobin == null) {
            throw new ProxysdkException(SendResult.NO_CONNECTION.toString());
        }
        if (this.currentBufferSize.get() >= this.asyncCallbackMaxSize) {
            throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
        }
        if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
            logger.error("error attr format {} {}", encodeObject.getCommonattr(), encodeObject.getAttributes());
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        int i = 1;
        if (this.isFile) {
            if (encodeObject.getBodyBytes() != null) {
                i = encodeObject.getBodyBytes().length;
            } else {
                Iterator<byte[]> it = encodeObject.getBodylist().iterator();
                while (it.hasNext()) {
                    i += it.next().length;
                }
            }
            if (this.currentBufferSize.addAndGet(i) >= this.asyncCallbackMaxSize) {
                this.currentBufferSize.addAndGet(-i);
                throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
            }
        } else if (this.currentBufferSize.incrementAndGet() >= this.asyncCallbackMaxSize) {
            this.currentBufferSize.decrementAndGet();
            throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
        }
        if (this.callbacks.computeIfAbsent(clientByRoundRobin.getChannel(), channel -> {
            return new ConcurrentHashMap();
        }).putIfAbsent(encodeObject.getMessageId(), new QueueObject(System.currentTimeMillis(), sendMessageCallback, i, j, timeUnit)) != null) {
            logger.warn("message id {} has existed.", encodeObject.getMessageId());
        }
        if (encodeObject.getMsgtype() == 7) {
            int i2 = 0;
            int i3 = 0;
            if (this.clientMgr.getGroupId().length() != 0 && encodeObject.getGroupId().equals(this.clientMgr.getGroupId())) {
                i2 = this.clientMgr.getGroupIdNum();
                i3 = this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null ? this.clientMgr.getStreamIdMap().get(encodeObject.getStreamId()).intValue() : 0;
            }
            encodeObject.setGroupIdNum(i2);
            encodeObject.setStreamIdNum(i3);
            if (i2 == 0 || i3 == 0) {
                encodeObject.setGroupIdTransfer(false);
            }
        }
        if (this.configure.isNeedDataEncry()) {
            encodeObject.setEncryptEntry(true, this.configure.getUserName(), this.clientMgr.getEncryptConfigEntry());
        } else {
            encodeObject.setEncryptEntry(false, null, null);
        }
        encodeObject.setMsgUUID(str);
        clientByRoundRobin.write(encodeObject);
    }

    public void notifyFeedback(Channel channel, EncodeObject encodeObject) {
        String messageId = encodeObject.getMessageId();
        this.chooseProxy.remove(messageId);
        SyncMessageCallable remove = this.syncCallables.remove(messageId);
        SendResult sendResult = encodeObject.isException() ? SendResult.INVALID_ATTRIBUTES : SendResult.OK;
        if (sendResult == SendResult.OK) {
            this.metricWorker.recordSuccessByMessageId(messageId);
        }
        if (remove != null) {
            remove.update(sendResult);
        }
        if (encodeObject.isException()) {
            logger.error("{} exception happens, error message {}", channel, encodeObject.getExceptionError());
        }
        notifyCallback(channel, messageId, sendResult);
    }

    /* JADX WARN: Code restructure failed: missing block: B:51:0x010f, code lost:
    
        r0.update(org.apache.inlong.sdk.dataproxy.SendResult.CONNECTION_BREAK);
        r5.syncCallables.remove(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void notifyConnectionDisconnected(org.jboss.netty.channel.Channel r6) {
        /*
            Method dump skipped, instructions count: 311
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.dataproxy.network.Sender.notifyConnectionDisconnected(org.jboss.netty.channel.Channel):void");
    }

    public void waitForAckForChannel(Channel channel) {
        if (channel == null) {
            return;
        }
        logger.info("wait for ack for channel {}", channel);
        try {
            ConcurrentHashMap<String, QueueObject> concurrentHashMap = this.callbacks.get(channel);
            if (concurrentHashMap != null) {
                while (!concurrentHashMap.isEmpty()) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        logger.error("wait for ack for channel {}, error {}", channel, e.getMessage());
                        e.printStackTrace();
                    }
                }
                logger.info("this channel {} is empty!", channel);
            }
            logger.info("waitForAckForChannel finished , channel is {}", channel);
        } catch (Throwable th) {
            logger.error("waitForAckForChannel exception, channel is {}", channel, th);
        }
    }

    public void clearCallBack() {
        this.currentBufferSize.set(0);
        this.callbacks.clear();
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public void setClusterId(String str) {
        this.clusterId = str;
    }
}
