package com.jd.bdp.whale.client.Consumer;

import com.jd.bdp.whale.client.BrokerInfo;
import com.jd.bdp.whale.client.Message;
import com.jd.bdp.whale.common.command.FetchFailedCmd;
import com.jd.bdp.whale.common.command.FetchMsgsReqCmd;
import com.jd.bdp.whale.common.util.DataByteArrayInputStream;
import com.jd.bdp.whale.communication.SocketClient;
import com.jd.bdp.whale.communication.exception.ReqTimeOutException;
import com.jd.bdp.whale.communication.message.MsgMarshallerFactory;
import com.jd.dd.glowworm.PB;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;

/* loaded from: input_file:com/jd/bdp/whale/client/Consumer/ProFetchManager.class */
public class ProFetchManager extends Fetcher {
    private ActiveConsumer consumer;
    private static Logger logger = LoggerFactory.getLogger(ProFetchManager.class);
    private static int RETRY_TIMES = 10;

    public ProFetchManager(ActiveConsumer activeConsumer) {
        this.queue = new FetchRequestQueue();
        this.consumer = activeConsumer;
    }

    @Override // com.jd.bdp.whale.client.Consumer.Fetcher
    public void startFetchManager() {
        this.started = true;
    }

    @Override // com.jd.bdp.whale.client.Consumer.Fetcher
    public void restartFetchManager(List<FetchRequest> list) throws Exception {
        if (!this.started) {
            throw new Exception("restart ProFetchManager failed,You need to stop fetchManager before.");
        }
        shutdownFethManager();
        this.queue = new FetchRequestQueue();
        Iterator<FetchRequest> it = list.iterator();
        while (it.hasNext()) {
            this.queue.offer(it.next());
        }
    }

    public Message[] processMessage() {
        Message[] messageArr = null;
        FetchRequestQueue fetchRequestQueue = this.queue;
        FetchRequest fetchRequest = null;
        try {
            fetchRequest = fetchRequestQueue.take();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (!fetchRequest.getOnline().get()) {
            logger.debug("当前brokerId:{}已下线，返回空数组!", fetchRequest.getId());
            return new Message[0];
        }
        SocketClient connection = fetchRequest.getBrokerInfo().getConnection();
        int i = 0;
        while (true) {
            if (i >= RETRY_TIMES) {
                break;
            }
            try {
                com.jd.bdp.whale.communication.message.Message message = new com.jd.bdp.whale.communication.message.Message(15, PB.toPBBytes(new FetchMsgsReqCmd(this.consumer.getTopicName(), this.consumer.getGroup(), this.consumer.getClientId())), true);
                message.addOneProperty(FetchMsgsReqCmd.FETCH_CUST_SIGN, CustomBooleanEditor.VALUE_1);
                com.jd.bdp.whale.communication.message.Message sendMsg = connection.sendMsg(message);
                if (sendMsg.getMsgType() == MsgMarshallerFactory.TransportExceptionResponse_MsgType || sendMsg.getMsgType() == MsgMarshallerFactory.BusinExceptionResponse_MsgType) {
                    logger.error("MsgType=4:");
                    throw new RuntimeException("拉取消息失败");
                }
                if (sendMsg.getMsgType() == MsgMarshallerFactory.Response_MsgType) {
                    byte[] content = sendMsg.getContent();
                    DataByteArrayInputStream dataByteArrayInputStream = new DataByteArrayInputStream(content);
                    int length = content.length;
                    messageArr = length == 0 ? new Message[0] : rebuildMessage(dataByteArrayInputStream, length, sendMsg.getOneProperty(FetchMsgsReqCmd.FETCH_OFFSET), Integer.parseInt(sendMsg.getOneProperty(FetchMsgsReqCmd.FETCH_BROKER_ID)));
                }
            } catch (ReqTimeOutException e2) {
                logger.error("消费失败，重新消费，ReqTime : {}", (Throwable) e2);
            } catch (InterruptedException e3) {
                logger.error("消费失败，重新消费，InterruptedClass: {}, InterruptedMessage: {}, InterruptedException:{}", (Object[]) new String[]{e3.getClass().getName(), e3.getMessage(), Arrays.toString(e3.getStackTrace())});
            } catch (Throwable th) {
                th.printStackTrace();
                logger.error("消费失败，重新消费，ThrowableClass: {}, ThrowableMessage: {}, Throwable: {}", (Object[]) new String[]{th.getClass().getName(), th.getMessage(), Arrays.toString(th.getStackTrace())});
            }
            i++;
        }
        if (messageArr == null) {
            logger.error("消费{}次仍然失败，发送失败确认给broker", Integer.valueOf(RETRY_TIMES));
            warnAboutFetchFailed(connection);
            logger.error("确认完毕，返回空数组");
            messageArr = new Message[0];
        }
        if (fetchRequest != null && fetchRequest.getOnline().get()) {
            fetchRequestQueue.offer(fetchRequest);
        }
        return messageArr;
    }

    private Message[] rebuildMessage(DataByteArrayInputStream dataByteArrayInputStream, int i, String str, int i2) {
        ArrayList arrayList = new ArrayList();
        int i3 = 0;
        int i4 = 0;
        while (i3 < i) {
            int readInt = dataByteArrayInputStream.readInt();
            byte[] bArr = new byte[readInt];
            dataByteArrayInputStream.readFully(bArr);
            StringBuilder sb = new StringBuilder(i2 + "_" + str);
            sb.append("_").append(i4);
            arrayList.add(new Message(sb.toString(), bArr));
            i3 += readInt + 4;
            i4++;
        }
        return (Message[]) arrayList.toArray(new Message[i4]);
    }

    public Message[] processMessage(String str) {
        Message[] messageArr = null;
        String str2 = str.split("_")[0];
        BrokerInfo brokerInfo = null;
        Iterator<BrokerInfo> it = this.consumer.getTopicInfo().getBrokers().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            BrokerInfo next = it.next();
            if (next.getId().intValue() == Integer.parseInt(str2)) {
                brokerInfo = next;
                break;
            }
        }
        if (brokerInfo == null) {
            throw new RuntimeException("无法消费消息，brokerId为" + str2 + "已经下线");
        }
        SocketClient connection = brokerInfo.getConnection();
        com.jd.bdp.whale.communication.message.Message message = new com.jd.bdp.whale.communication.message.Message(15, PB.toPBBytes(new FetchMsgsReqCmd(this.consumer.getTopicName(), this.consumer.getGroup(), this.consumer.getClientId())), true);
        message.addOneProperty(FetchMsgsReqCmd.FETCH_CUST_SIGN, CustomBooleanEditor.VALUE_1);
        message.addOneProperty(FetchMsgsReqCmd.FETCH_BROKER_ID, String.valueOf(str2));
        message.addOneProperty(FetchMsgsReqCmd.FETCH_OFFSET, str.substring(str2.length() + 1, str.length()));
        int i = 0;
        while (true) {
            if (i >= RETRY_TIMES) {
                break;
            }
            try {
                com.jd.bdp.whale.communication.message.Message sendMsg = connection.sendMsg(message);
                if (sendMsg.getMsgType() == MsgMarshallerFactory.TransportExceptionResponse_MsgType || sendMsg.getMsgType() == MsgMarshallerFactory.BusinExceptionResponse_MsgType) {
                    logger.error("MsgType=4:");
                    throw new RuntimeException("拉取消息失败");
                }
                if (sendMsg.getMsgType() == MsgMarshallerFactory.Response_MsgType) {
                    byte[] content = sendMsg.getContent();
                    Message[] rebuildMessage = rebuildMessage(new DataByteArrayInputStream(content), content.length, sendMsg.getOneProperty(FetchMsgsReqCmd.FETCH_OFFSET), Integer.parseInt(sendMsg.getOneProperty(FetchMsgsReqCmd.FETCH_BROKER_ID)));
                    String[] split = str.split("_");
                    int parseInt = Integer.parseInt(split[split.length - 1]);
                    Message[] messageArr2 = new Message[rebuildMessage.length - parseInt];
                    System.arraycopy(rebuildMessage, parseInt, messageArr2, 0, rebuildMessage.length - parseInt);
                    return messageArr2;
                }
            } catch (ReqTimeOutException e) {
                logger.error("消费失败，重新消费，ReqTime : {}", (Throwable) e);
            } catch (InterruptedException e2) {
                logger.error("消费失败，重新消费，InterruptedClass: {}, InterruptedMessage: {}, InterruptedException:{}", (Object[]) new String[]{e2.getClass().getName(), e2.getMessage(), Arrays.toString(e2.getStackTrace())});
            } catch (Throwable th) {
                logger.error("消费失败，重新消费，ThrowableClass: {}, ThrowableMessage: {}, Throwable: {}", (Object[]) new String[]{th.getClass().getName(), th.getMessage(), Arrays.toString(th.getStackTrace())});
            }
            i++;
        }
        if (0 == 0) {
            logger.error("消费{}次仍然失败，发送失败确认给broker", Integer.valueOf(RETRY_TIMES));
            warnAboutFetchFailed(connection);
            logger.error("确认完毕，返回空数组");
            messageArr = new Message[0];
        }
        return messageArr;
    }

    private void warnAboutFetchFailed(SocketClient socketClient) {
        try {
            socketClient.sendMsg(new com.jd.bdp.whale.communication.message.Message(20, PB.toPBBytes(new FetchFailedCmd(this.consumer.getTopicName(), this.consumer.getGroup(), this.consumer.getClientId())), false));
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("通知broker拉取失败，没成功.Exception:{}", (Throwable) e);
        }
    }
}
