package com.jd.bdp.whale.client.Consumer;

import com.jd.bdp.whale.client.MessageListener;
import com.jd.bdp.whale.common.command.FetchMsgsReqCmd;
import com.jd.bdp.whale.common.util.DataByteArrayInputStream;
import com.jd.bdp.whale.communication.SocketClient;
import com.jd.bdp.whale.communication.exception.ReqTimeOutException;
import com.jd.bdp.whale.communication.message.Message;
import com.jd.bdp.whale.communication.message.MsgMarshallerFactory;
import com.jd.dd.glowworm.PB;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jd/bdp/whale/client/Consumer/FetchManager.class */
public class FetchManager extends Fetcher {
    private static Logger logger = LoggerFactory.getLogger(FetchManager.class);
    private MessageListener listener;
    private volatile FetchThread[] fetchThreads;
    private DefaultConsumer consumer;
    private FetchThread[] fetchThreads_emptys = new FetchThread[0];
    private Map<Integer, Boolean> lastCheckstatus = new ConcurrentHashMap();
    private AtomicInteger reloadcount = new AtomicInteger(0);
    private long delay = 10;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/jd/bdp/whale/client/Consumer/FetchManager$BoolValue.class */
    public class BoolValue {
        private boolean value;

        public BoolValue(boolean z) {
            this.value = false;
            this.value = z;
        }

        boolean getValue() {
            return this.value;
        }

        void setValue(boolean z) {
            this.value = z;
        }
    }

    /* loaded from: input_file:com/jd/bdp/whale/client/Consumer/FetchManager$FetchThread.class */
    public class FetchThread extends Thread {
        private volatile boolean runing = true;

        public FetchThread(String str) {
            setName(str);
        }

        public void shutdown() {
            this.runing = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            FetchRequest fetchRequest = null;
            while (this.runing) {
                BoolValue boolValue = new BoolValue(true);
                FetchRequestQueue fetchRequestQueue = FetchManager.this.queue;
                try {
                    fetchRequest = fetchRequestQueue.take();
                } catch (ReqTimeOutException e) {
                    boolValue.setValue(false);
                    FetchManager.logger.debug("接收消息请求超时， {}", e.getMessage());
                    e.printStackTrace();
                } catch (InterruptedException e2) {
                    boolValue.setValue(false);
                    FetchManager.logger.error("接收消息Interrupted", (Throwable) e2);
                } catch (Throwable th) {
                    FetchManager.logger.error("接收消息Throwable", th);
                    th.printStackTrace();
                }
                if (fetchRequest.getOnline().get()) {
                    boolValue.setValue(!FetchManager.this.lastCheckstatus.containsKey(fetchRequest.getId()) ? boolValue.getValue() : ((Boolean) FetchManager.this.lastCheckstatus.get(fetchRequest.getId())).booleanValue());
                    processMessage(fetchRequest, boolValue);
                    if (fetchRequest != null) {
                        FetchManager.this.lastCheckstatus.put(fetchRequest.getId(), Boolean.valueOf(boolValue.getValue()));
                        if (fetchRequest.getOnline().get()) {
                            fetchRequestQueue.offer(fetchRequest);
                        }
                    }
                }
            }
        }

        public void processMessage(FetchRequest fetchRequest, BoolValue boolValue) throws Exception {
            SocketClient connection = fetchRequest.getBrokerInfo().getConnection();
            FetchMsgsReqCmd fetchMsgsReqCmd = new FetchMsgsReqCmd(FetchManager.this.consumer.getTopicName(), FetchManager.this.consumer.getGroup(), FetchManager.this.consumer.getClientId());
            fetchMsgsReqCmd.setLastFetchOk(boolValue.getValue());
            Message sendMsg = connection.sendMsg(new Message(15, PB.toPBBytes(fetchMsgsReqCmd), true));
            if (sendMsg.getMsgType() == MsgMarshallerFactory.TransportExceptionResponse_MsgType || sendMsg.getMsgType() == MsgMarshallerFactory.BusinExceptionResponse_MsgType) {
                boolValue.setValue(false);
                FetchManager.logger.error("MsgType=4:" + boolValue.getValue());
                return;
            }
            if (sendMsg.getMsgType() == MsgMarshallerFactory.Response_MsgType) {
                byte[] content = sendMsg.getContent();
                DataByteArrayInputStream dataByteArrayInputStream = new DataByteArrayInputStream(content);
                int length = content.length;
                int i = 0;
                int count = fetchRequest.getCount();
                if (length == 0) {
                    if (count < 10) {
                        fetchRequest.setCount(count + 1);
                    }
                    Thread.sleep(FetchManager.this.delay * count * 10);
                } else {
                    fetchRequest.setCount(0);
                    while (i < length) {
                        int readInt = dataByteArrayInputStream.readInt();
                        byte[] bArr = new byte[readInt];
                        dataByteArrayInputStream.readFully(bArr);
                        FetchManager.this.listener.recieveMessages(bArr);
                        i += readInt + 4;
                    }
                }
                boolValue.setValue(true);
            }
        }
    }

    public FetchManager(MessageListener messageListener, DefaultConsumer defaultConsumer) {
        this.fetchThreads = this.fetchThreads_emptys;
        this.queue = new FetchRequestQueue();
        this.fetchThreads = new FetchThread[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < this.fetchThreads.length; i++) {
            this.fetchThreads[i] = new FetchThread("fetchThread-" + i);
        }
        this.listener = messageListener;
        this.consumer = defaultConsumer;
    }

    @Override // com.jd.bdp.whale.client.Consumer.Fetcher
    public void startFetchManager() {
        for (FetchThread fetchThread : this.fetchThreads) {
            fetchThread.start();
        }
        this.started = true;
    }

    @Override // com.jd.bdp.whale.client.Consumer.Fetcher
    public synchronized void restartFetchManager(List<FetchRequest> list) throws Exception {
        if (!this.started) {
            throw new Exception("reStart FetchManager fail,You need started fetchManager before.");
        }
        shutdownFethManager();
        FetchRequestQueue fetchRequestQueue = new FetchRequestQueue();
        FetchThread[] fetchThreadArr = new FetchThread[this.fetchThreads.length];
        for (int i = 0; i < this.fetchThreads.length; i++) {
            fetchThreadArr[i] = new FetchThread("newFetchThread-" + this.reloadcount.get() + "-" + i);
        }
        this.queue = fetchRequestQueue;
        Iterator<FetchRequest> it = list.iterator();
        while (it.hasNext()) {
            this.queue.offer(it.next());
        }
        this.fetchThreads = fetchThreadArr;
        for (FetchThread fetchThread : fetchThreadArr) {
            fetchThread.start();
        }
        this.reloadcount.incrementAndGet();
    }

    @Override // com.jd.bdp.whale.client.Consumer.Fetcher
    public void shutdownFethManager() {
        for (FetchThread fetchThread : this.fetchThreads) {
            fetchThread.interrupt();
            fetchThread.shutdown();
        }
    }
}
