package com.jd.bdp.whale.communication;

import com.jd.bdp.whale.common.command.FetchMsgsReqCmd;
import com.jd.bdp.whale.common.util.DataByteArrayInputStream;
import com.jd.bdp.whale.communication.message.Message;
import com.jd.bdp.whale.communication.message.MsgMarshallerFactory;
import com.jd.bdp.whale.communication.transport.DefaultTransportListener;
import com.jd.bdp.whale.communication.transport.InactiveConnectionMonitor;
import com.jd.bdp.whale.communication.transport.ResponseCorrelator;
import com.jd.bdp.whale.communication.transport.Transport;
import com.jd.bdp.whale.communication.transport.TransportFilter;
import com.jd.bdp.whale.communication.util.WorkerHandlerThreadPool;
import com.jd.dd.glowworm.PB;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jd/bdp/whale/communication/TransportConnection_Thread.class */
public class TransportConnection_Thread {
    private TransportFilter theTransport;
    private WorkerHandlerThreadPool workerHandlerThreadPool;
    private static final int ReadCheck_Time = 30000;
    private static final int InitialDelay_Time = 10000;
    private String remoteIpAddr;
    private static Logger logger = LoggerFactory.getLogger(TransportConnection_Thread.class);
    private static Object test_obj = new Object();
    private static HashMap<String, Test_Client_Cnt> testHMap = new HashMap<>();
    private static long test_msg_cnt = 0;

    /* loaded from: input_file:com/jd/bdp/whale/communication/TransportConnection_Thread$Test_Client_Cnt.class */
    class Test_Client_Cnt {
        private String topic;
        private String group;
        private String clientId;
        private long msgCnt = 0;

        protected Test_Client_Cnt(String str, String str2, String str3) {
            this.topic = str;
            this.group = str2;
            this.clientId = str3;
        }

        protected synchronized void addCnt(int i) {
            this.msgCnt += i;
        }

        protected void printInfo() {
            TransportConnection_Thread.logger.info("++++++++++++TransportConnection_Thread!!!!!!#" + this.topic + "#" + this.group + "#" + this.clientId + "#" + this.msgCnt);
        }
    }

    public TransportConnection_Thread(Transport transport, int i) {
        this(transport, i, 30000, 10000);
    }

    public TransportConnection_Thread(Transport transport, int i, int i2, int i3) {
        this.theTransport = new ResponseCorrelator(new InactiveConnectionMonitor(transport, i, i2, i3));
        this.theTransport.setTransportListener(new DefaultTransportListener() { // from class: com.jd.bdp.whale.communication.TransportConnection_Thread.1
            @Override // com.jd.bdp.whale.communication.transport.DefaultTransportListener, com.jd.bdp.whale.communication.transport.TransportListener
            public void onCommand(Message message) {
                Message message2 = null;
                boolean isResponseRequired = message.isResponseRequired();
                try {
                    message2 = TransportConnection_Thread.this.doMsgHandler(message);
                } catch (Throwable th) {
                    th.printStackTrace();
                    TransportConnection_Thread.logger.error("在进行业务处理的时候发生异常!!!", th);
                    if (isResponseRequired) {
                        message2 = new Message(MsgMarshallerFactory.BusinExceptionResponse_MsgType, th.toString().getBytes(), false);
                    }
                }
                if (isResponseRequired) {
                    if (message2 == null) {
                        TransportConnection_Thread.logger.error("请求端需要有响应信息,实际却没有生成响应!!!");
                        message2 = new Message(MsgMarshallerFactory.BusinExceptionResponse_MsgType, "".getBytes(), false);
                    }
                    message2.setMsgId(message.getMsgId());
                    try {
                        TransportConnection_Thread.this.theTransport.oneway(message2);
                    } catch (Exception e) {
                        e.printStackTrace();
                        TransportConnection_Thread.logger.error("发生响应给请求端,发生异常!!!", (Throwable) e);
                        if (message2.getTheMessageCallBack() != null) {
                            message2.getTheMessageCallBack().messagSendException(message);
                        }
                    }
                }
            }

            private void test(Message message, Message message2) {
                if (message.getMsgType() == 15) {
                    TransportConnection_Thread.logger.info("++++++++++++TransportConnection_Thread request !!!!!!#\t" + message);
                    if (message2.getMsgType() == 3) {
                        DataByteArrayInputStream dataByteArrayInputStream = new DataByteArrayInputStream(message2.getContent());
                        int length = message2.getContent().length;
                        int i4 = 0;
                        int i5 = 0;
                        while (i5 < length) {
                            int readInt = dataByteArrayInputStream.readInt();
                            dataByteArrayInputStream.readFully(new byte[readInt]);
                            i5 += readInt + 4;
                            i4++;
                        }
                        synchronized (TransportConnection_Thread.test_obj) {
                            FetchMsgsReqCmd fetchMsgsReqCmd = (FetchMsgsReqCmd) PB.parsePBBytes(message.getContent());
                            String str = fetchMsgsReqCmd.getTopic() + "#" + fetchMsgsReqCmd.getGroup() + "#" + fetchMsgsReqCmd.getSubscriberId();
                            Test_Client_Cnt test_Client_Cnt = (Test_Client_Cnt) TransportConnection_Thread.testHMap.get(str);
                            if (test_Client_Cnt == null) {
                                test_Client_Cnt = new Test_Client_Cnt(fetchMsgsReqCmd.getTopic(), fetchMsgsReqCmd.getGroup(), fetchMsgsReqCmd.getSubscriberId());
                                TransportConnection_Thread.testHMap.put(str, test_Client_Cnt);
                            }
                            test_Client_Cnt.addCnt(i4);
                            Iterator it = TransportConnection_Thread.testHMap.entrySet().iterator();
                            while (it.hasNext()) {
                                ((Test_Client_Cnt) ((Map.Entry) it.next()).getValue()).printInfo();
                            }
                        }
                    }
                }
            }

            @Override // com.jd.bdp.whale.communication.transport.DefaultTransportListener, com.jd.bdp.whale.communication.transport.TransportListener
            public void onException(Exception exc) {
                TransportConnection_Thread.this.transportOnException(exc);
            }

            @Override // com.jd.bdp.whale.communication.transport.DefaultTransportListener, com.jd.bdp.whale.communication.transport.TransportListener
            public void transportResumed() {
                TransportConnection_Thread.this.transportOnResumed();
            }

            @Override // com.jd.bdp.whale.communication.transport.DefaultTransportListener, com.jd.bdp.whale.communication.transport.TransportListener
            public void transportFirstConnect() {
                TransportConnection_Thread.this.transportOnFirstConnect();
            }
        });
    }

    public Message doMsgHandler(Message message) throws Throwable {
        Message message2 = null;
        if (message.isResponseRequired()) {
            message2 = new Message(MsgMarshallerFactory.Response_MsgType, new String("").getBytes(), false);
        }
        return message2;
    }

    public void transportOnException(Exception exc) {
    }

    public void transportOnResumed() {
    }

    public void transportOnFirstConnect() {
    }

    public void stop() throws Exception {
        this.workerHandlerThreadPool = null;
        if (this.theTransport != null) {
            this.theTransport.stop();
        }
    }

    public Message sendMsg(Message message, int i) throws Exception {
        message.setResponseRequired(true);
        return this.theTransport.request(message, i);
    }

    public void sendMsg(Message message) throws IOException {
        this.theTransport.request(message);
    }

    public TransportFilter getTheTransport() {
        return this.theTransport;
    }

    public void setTheTransport(TransportFilter transportFilter) {
        this.theTransport = transportFilter;
    }

    public String getRemoteIpAddr() {
        return this.remoteIpAddr;
    }

    public void setRemoteIpAddr(String str) {
        this.remoteIpAddr = str;
    }
}
