package com.jd.bdp.whale.client.Producer;

import com.jd.bdp.whale.client.BrokerInfo;
import com.jd.bdp.whale.client.Client;
import com.jd.bdp.whale.client.MessageProducer;
import com.jd.bdp.whale.client.NameServer.NameServer;
import com.jd.bdp.whale.common.communication.CommonResponse;
import com.jd.bdp.whale.common.util.DataByteArrayOutputStream;
import com.jd.bdp.whale.communication.SocketClient;
import com.jd.bdp.whale.communication.exception.ReqTimeOutException;
import com.jd.bdp.whale.communication.message.Message;
import com.jd.bdp.whale.communication.message.MsgMarshallerFactory;
import com.jd.dd.glowworm.PB;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jd/bdp/whale/client/Producer/DefaultProducer.class */
public class DefaultProducer extends Client implements MessageProducer {
    private static Logger logger = LoggerFactory.getLogger(DefaultProducer.class);
    private NameServer managerNode;
    private AtomicInteger borkerCursors;

    @Override // com.jd.bdp.whale.client.Client
    public void shutdown() {
        Iterator<BrokerInfo> it = this.topicInfo.getBrokers().iterator();
        while (!it.hasNext()) {
            try {
                it.next().getConnectionPool().shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.managerNode.removeClient(this.topicInfo.getName(), this);
        this.managerNode.removeTopicInfo(this.topicInfo.getName());
    }

    public DefaultProducer(int i, String str, NameServer nameServer) {
        super(str, 1);
        this.borkerCursors = new AtomicInteger(0);
        setConns(i);
        this.managerNode = nameServer;
    }

    public DefaultProducer(int i, String str, NameServer nameServer, String str2, String str3) {
        super(str, 1, str2, str3);
        this.borkerCursors = new AtomicInteger(0);
        setConns(i);
        this.managerNode = nameServer;
    }

    @Override // com.jd.bdp.whale.client.MessageProducer
    public void publish(String str) throws Exception {
        publish(str, null);
    }

    @Override // com.jd.bdp.whale.client.MessageProducer
    public void publish(String str, String str2) throws Exception {
        setTopicName(str);
        this.cluster = str2;
        this.topicInfo = this.managerNode.register(this);
        logger.info("whale连接上管理端，注册topic:{}成功", str);
        setRegsiter(true);
        shakeHandWithBrokers();
    }

    @Override // com.jd.bdp.whale.client.MessageProducer
    public void sendMessage(byte[] bArr) throws Exception {
        if (this.topicInfo == null) {
            throw new RuntimeException("发布topic没成功，请检查配置信息");
        }
        if (bArr == null || bArr.length == 0) {
            throw new RuntimeException("不能发送空消息");
        }
        DataByteArrayOutputStream dataByteArrayOutputStream = new DataByteArrayOutputStream();
        dataByteArrayOutputStream.writeByte(this.topicInfo.getName().length());
        dataByteArrayOutputStream.writeInt(bArr.length);
        dataByteArrayOutputStream.write(this.topicInfo.getName().getBytes());
        dataByteArrayOutputStream.write(bArr);
        byte[] bArr2 = new byte[dataByteArrayOutputStream.size()];
        System.arraycopy(dataByteArrayOutputStream.getData(), 0, bArr2, 0, dataByteArrayOutputStream.size());
        Message message = new Message(13, bArr2, true);
        Message message2 = null;
        int size = this.topicInfo.getBrokers().size();
        while (true) {
            try {
                message2 = getConnection().sendMsg(message);
            } catch (ReqTimeOutException e) {
                logger.debug("ReqTimeOutException", (Throwable) e);
            } catch (Exception e2) {
                logger.debug("发送消息出现异常，之后会重试！当前异常信息为{}", (Throwable) e2);
                size--;
                if (size == 0) {
                    logger.error("发送消息出现异常，到达重试次数，发送失败！");
                    throw new RuntimeException("broker is not connnect!", e2);
                }
            }
            if (message2.getMsgType() != MsgMarshallerFactory.TransportExceptionResponse_MsgType) {
                break;
            }
        }
        if (message2 != null) {
            CommonResponse commonResponse = (CommonResponse) PB.parsePBBytes(message2.getContent());
            if (commonResponse.getType() == 1) {
                throw new RuntimeException(commonResponse.getContent().toString());
            }
        }
    }

    private SocketClient getConnection() throws Exception {
        if (this.topicInfo.getBrokers().size() == 0) {
            throw new Exception("no broker excption");
        }
        return this.topicInfo.getBrokers().get(this.borkerCursors.getAndIncrement() % this.topicInfo.getBrokers().size()).getConnection();
    }

    @Override // com.jd.bdp.whale.client.Client
    public void syncBrokerOffline(List<BrokerInfo> list) throws Exception {
        this.topicInfo = this.managerNode.getTopicInfo(this.topicInfo.getName());
        for (BrokerInfo brokerInfo : list) {
            brokerInfo.getConnectionPool().shutdown();
            logger.debug("brokerId:{}, 下线！", brokerInfo.getId());
        }
    }
}
