package com.jd.bdp.whale.communication.transport;

import com.jd.bdp.whale.common.util.DataByteArrayInputStream;
import com.jd.bdp.whale.communication.exception.MsgHeadException;
import com.jd.bdp.whale.communication.message.Message;
import com.jd.bdp.whale.communication.util.Utilities;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jd/bdp/whale/communication/transport/AutoReconnectDataSocket.class */
public class AutoReconnectDataSocket implements Runnable, Transport {
    private Socket theSocket;
    private String ipAddr;
    private int port;
    private int reconnectionDelayTime;
    private TransportListener transportListener;
    private boolean stop = false;
    private boolean isSendError = false;
    private boolean isFirstSynchConnectOk;
    private static Logger logger = LoggerFactory.getLogger(AutoReconnectDataSocket.class);
    private static Object test_obj = new Object();
    private static HashMap<String, Test_Client_Cnt> testHMap = new HashMap<>();
    private static long test_msg_cnt = 0;

    /* loaded from: input_file:com/jd/bdp/whale/communication/transport/AutoReconnectDataSocket$Test_Client_Cnt.class */
    class Test_Client_Cnt {
        private String topic;
        private String group;
        private String clientId;
        private long cnt = 0;
        private long rcvCnt = 0;
        private ArrayList tmpRcvLst = new ArrayList();

        protected Test_Client_Cnt(String str, String str2, String str3) {
            this.topic = str;
            this.group = str2;
            this.clientId = str3;
        }

        protected synchronized void addCnt(int i, String str) {
            this.tmpRcvLst.add(str);
            this.cnt += i;
            this.rcvCnt++;
        }

        protected void printInfo() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/jd/bdp/whale/communication/transport/AutoReconnectDataSocket$TransportFirstConnectNotify.class */
    public class TransportFirstConnectNotify extends Thread {
        protected TransportFirstConnectNotify() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Thread.sleep(1000L);
                AutoReconnectDataSocket.this.getTransportListener().transportFirstConnect();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/jd/bdp/whale/communication/transport/AutoReconnectDataSocket$TransportResumedNotifyThrd.class */
    public class TransportResumedNotifyThrd extends Thread {
        protected TransportResumedNotifyThrd() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                AutoReconnectDataSocket.this.getTransportListener().transportResumed();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public AutoReconnectDataSocket(String str, int i, int i2) {
        this.ipAddr = "";
        this.port = 0;
        this.reconnectionDelayTime = 15000;
        this.isFirstSynchConnectOk = false;
        this.ipAddr = str;
        this.port = i;
        this.reconnectionDelayTime = i2;
        try {
            doConnect();
            this.isFirstSynchConnectOk = true;
            logger.debug("同步建立连接成功!!!");
        } catch (Exception e) {
            logger.debug("同步建立连接失败,进入异步建立连接模式!!!");
        }
    }

    private void initSocket() {
        while (!this.stop) {
            try {
                doConnect();
                return;
            } catch (IOException e) {
                logger.warn((this.reconnectionDelayTime / 1000) + " 秒进行重试链接 " + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + this.ipAddr + ":" + this.port);
                try {
                    Thread.sleep(this.reconnectionDelayTime);
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

    private void doConnect() throws IOException {
        this.theSocket = new Socket();
        this.theSocket.connect(new InetSocketAddress(InetAddress.getByName(this.ipAddr), this.port), 3000);
        this.theSocket.setSendBufferSize(32768);
        this.theSocket.setSoTimeout(10000);
        this.theSocket.setTcpNoDelay(true);
    }

    private void doSocketRead() {
        Message readMsg;
        while (true) {
            this.isSendError = false;
            if (this.stop) {
                logger.debug("停止AutoReconnectDataSocket");
                return;
            }
            try {
                InputStream inputStream = this.theSocket.getInputStream();
                while (!this.isSendError) {
                    try {
                        readMsg = Utilities.getInstance().readMsg(inputStream);
                    } catch (SocketTimeoutException e) {
                    } catch (Exception e2) {
                        throw new Exception(e2);
                    }
                    if (readMsg == null) {
                        throw new Exception("连接已经关闭");
                        break;
                    }
                    this.transportListener.onCommand(readMsg);
                }
                throw new Exception("发送消息时产生IO异常，需要进行重连操作!!!");
            } catch (MsgHeadException e3) {
                try {
                    stop();
                } catch (Exception e4) {
                    logger.debug("msgHead异常", (Throwable) e4);
                }
            } catch (Exception e5) {
                logger.debug(this.theSocket + " 读取发生异常", (Throwable) e5);
                onException(e5);
                try {
                    if (this.stop) {
                        logger.debug("停止AutoReconnectDataSocket");
                    }
                    this.isSendError = false;
                } catch (Exception e6) {
                    e6.printStackTrace();
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        doInitConnect();
    }

    private void doInitConnect() {
        if (!this.isFirstSynchConnectOk) {
            initSocket();
        }
        new TransportFirstConnectNotify().start();
        doSocketRead();
    }

    private void onException(Exception exc) {
        if (this.stop) {
            return;
        }
        this.transportListener.onException(exc);
        reConnect();
    }

    private void reConnect() {
        initSocket();
        if (this.stop) {
            return;
        }
        new TransportResumedNotifyThrd().start();
    }

    public String getRemoteAddress() {
        return null;
    }

    @Override // com.jd.bdp.whale.communication.transport.Transport
    public void oneway(Message message) throws IOException {
        try {
            if (this.theSocket == null) {
                System.out.println(this.theSocket);
            }
            Utilities.getInstance().writeMsgThroughTcp(message, this.theSocket.getOutputStream());
        } catch (IOException e) {
            this.isSendError = true;
            throw e;
        }
    }

    @Override // com.jd.bdp.whale.communication.transport.Transport
    public TransportListener getTransportListener() {
        return this.transportListener;
    }

    @Override // com.jd.bdp.whale.communication.transport.Transport
    public void setTransportListener(TransportListener transportListener) {
        this.transportListener = transportListener;
    }

    @Override // com.jd.bdp.whale.communication.transport.Transport
    public void stop() throws Exception {
        try {
            if (this.transportListener != null) {
                this.transportListener.onException(new IOException("关闭连接"));
                this.transportListener = null;
            }
            if (this.theSocket != null) {
                logger.debug("theSocket 关闭socket链路 " + this.theSocket);
                this.theSocket.close();
                this.theSocket = null;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        this.stop = true;
    }

    private void testCnt(Message message) {
        if (message == null || message.getOneProperty("fetchResp") == null) {
            return;
        }
        DataByteArrayInputStream dataByteArrayInputStream = new DataByteArrayInputStream(message.getContent());
        int length = message.getContent().length;
        int i = 0;
        int i2 = 0;
        while (i2 < length) {
            int readInt = dataByteArrayInputStream.readInt();
            dataByteArrayInputStream.readFully(new byte[readInt]);
            i2 += readInt + 4;
            i++;
        }
        synchronized (test_obj) {
            String str = message.getOneProperty("topic") + "#" + message.getOneProperty("group") + "#" + message.getOneProperty("subscriberId");
            Test_Client_Cnt test_Client_Cnt = testHMap.get(str);
            if (test_Client_Cnt == null) {
                test_Client_Cnt = new Test_Client_Cnt(message.getOneProperty("topic"), message.getOneProperty("group"), message.getOneProperty("subscriberId"));
                testHMap.put(str, test_Client_Cnt);
            }
            test_Client_Cnt.addCnt(i, Long.toString(message.getMsgId()));
        }
        Iterator<Map.Entry<String, Test_Client_Cnt>> it = testHMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().printInfo();
        }
    }
}
