package org.apache.iotdb.consensus.multileader.logdispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.ratis.Utils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.class */
public class LogDispatcher {
    private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
    private final MultiLeaderServerImpl impl;
    private final List<LogDispatcherThread> threads;
    private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
    private ExecutorService executorService;

    /* loaded from: input_file:org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher$LogDispatcherThread.class */
    public class LogDispatcherThread implements Runnable {
        private final MultiLeaderConfig config;
        private final Peer peer;
        private final IndexController controller;
        private final SyncStatus syncStatus;
        private final BlockingQueue<IndexedConsensusRequest> pendingRequest;
        private final ConsensusReqReader reader;
        private final List<IndexedConsensusRequest> bufferedRequest = new LinkedList();
        private volatile boolean stopped = false;

        public LogDispatcherThread(Peer peer, MultiLeaderConfig multiLeaderConfig) {
            this.reader = (ConsensusReqReader) LogDispatcher.this.impl.getStateMachine().read(new GetConsensusReqReaderPlan());
            this.peer = peer;
            this.config = multiLeaderConfig;
            this.pendingRequest = new ArrayBlockingQueue(multiLeaderConfig.getReplication().getMaxPendingRequestNumPerNode());
            this.controller = new IndexController(LogDispatcher.this.impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()), false);
            this.syncStatus = new SyncStatus(this.controller, multiLeaderConfig);
        }

        public IndexController getController() {
            return this.controller;
        }

        public long getCurrentSyncIndex() {
            return this.controller.getCurrentIndex();
        }

        public Peer getPeer() {
            return this.peer;
        }

        public MultiLeaderConfig getConfig() {
            return this.config;
        }

        public BlockingQueue<IndexedConsensusRequest> getPendingRequest() {
            return this.pendingRequest;
        }

        public void stop() {
            this.stopped = true;
        }

        public boolean isStopped() {
            return this.stopped;
        }

        @Override // java.lang.Runnable
        public void run() {
            PendingBatch batch;
            LogDispatcher.this.logger.info("{}: Dispatcher for {} starts", LogDispatcher.this.impl.getThisNode(), this.peer);
            while (!Thread.interrupted() && !this.stopped) {
                try {
                    while (true) {
                        batch = getBatch();
                        if (batch.isEmpty()) {
                            this.bufferedRequest.add(this.pendingRequest.take());
                            if (this.pendingRequest.size() <= this.config.getReplication().getMaxRequestPerBatch()) {
                                Thread.sleep(this.config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
                            }
                        }
                    }
                    this.syncStatus.addNextBatch(batch);
                    sendBatchAsync(batch, new DispatchLogHandler(this, batch));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    LogDispatcher.this.logger.error("Unexpected error in logDispatcher for peer {}", this.peer, e2);
                }
            }
            LogDispatcher.this.logger.info("{}: Dispatcher for {} exits", LogDispatcher.this.impl.getThisNode(), this.peer);
        }

        public PendingBatch getBatch() {
            PendingBatch pendingBatch;
            ArrayList arrayList = new ArrayList();
            long nextSendingIndex = this.syncStatus.getNextSendingIndex();
            long currentIndex = LogDispatcher.this.impl.getController().getCurrentIndex() + 1;
            if (this.bufferedRequest.size() <= this.config.getReplication().getMaxRequestPerBatch()) {
                this.pendingRequest.drainTo(this.bufferedRequest, this.config.getReplication().getMaxRequestPerBatch() - this.bufferedRequest.size());
            }
            if (this.bufferedRequest.isEmpty()) {
                pendingBatch = new PendingBatch(nextSendingIndex, constructBatchFromWAL(nextSendingIndex, currentIndex, arrayList), arrayList);
                LogDispatcher.this.logger.debug("{} : accumulated a {} from wal", LogDispatcher.this.impl.getThisNode().getGroupId(), pendingBatch);
            } else {
                Iterator<IndexedConsensusRequest> it = this.bufferedRequest.iterator();
                IndexedConsensusRequest next = it.next();
                long constructBatchFromWAL = constructBatchFromWAL(nextSendingIndex, next.getSearchIndex(), arrayList);
                if (arrayList.size() == this.config.getReplication().getMaxRequestPerBatch()) {
                    PendingBatch pendingBatch2 = new PendingBatch(nextSendingIndex, constructBatchFromWAL, arrayList);
                    LogDispatcher.this.logger.debug("{} : accumulated a {} from wal", LogDispatcher.this.impl.getThisNode().getGroupId(), pendingBatch2);
                    return pendingBatch2;
                }
                constructBatchIndexedFromConsensusRequest(next, arrayList);
                long searchIndex = next.getSearchIndex();
                it.remove();
                while (it.hasNext() && arrayList.size() <= this.config.getReplication().getMaxRequestPerBatch()) {
                    IndexedConsensusRequest next2 = it.next();
                    if (next2.getSearchIndex() != next.getSearchIndex() + 1) {
                        long constructBatchFromWAL2 = constructBatchFromWAL(next.getSearchIndex(), next2.getSearchIndex(), arrayList);
                        if (arrayList.size() == this.config.getReplication().getMaxRequestPerBatch()) {
                            PendingBatch pendingBatch3 = new PendingBatch(nextSendingIndex, constructBatchFromWAL2, arrayList);
                            LogDispatcher.this.logger.debug("{} : accumulated a {} from queue and wal", LogDispatcher.this.impl.getThisNode().getGroupId(), pendingBatch3);
                            return pendingBatch3;
                        }
                    }
                    constructBatchIndexedFromConsensusRequest(next2, arrayList);
                    searchIndex = next2.getSearchIndex();
                    next = next2;
                    it.remove();
                }
                pendingBatch = new PendingBatch(nextSendingIndex, searchIndex, arrayList);
                LogDispatcher.this.logger.debug("{} : accumulated a {} from queue and wal", LogDispatcher.this.impl.getThisNode().getGroupId(), pendingBatch);
            }
            return pendingBatch;
        }

        public void sendBatchAsync(PendingBatch pendingBatch, DispatchLogHandler dispatchLogHandler) {
            try {
                ((AsyncMultiLeaderServiceClient) LogDispatcher.this.clientManager.borrowClient(this.peer.getEndpoint())).syncLog(new TSyncLogReq(this.peer.getGroupId().convertToTConsensusGroupId(), pendingBatch.getBatches()), dispatchLogHandler);
            } catch (IOException | TException e) {
                LogDispatcher.this.logger.error("Can not sync logs to peer {} because", this.peer, e);
            }
        }

        public SyncStatus getSyncStatus() {
            return this.syncStatus;
        }

        /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader, long] */
        private long constructBatchFromWAL(long j, long j2, List<TLogBatch> list) {
            while (j < j2 && list.size() < this.config.getReplication().getMaxRequestPerBatch()) {
                ?? r0 = this.reader;
                j++;
                IConsensusRequest req = r0.getReq(r0);
                if (req != null) {
                    list.add(new TLogBatch(req.serializeToByteBuffer()));
                }
            }
            return j - 1;
        }

        private void constructBatchIndexedFromConsensusRequest(IndexedConsensusRequest indexedConsensusRequest, List<TLogBatch> list) {
            list.add(new TLogBatch(indexedConsensusRequest.serializeToByteBuffer()));
        }
    }

    public LogDispatcher(MultiLeaderServerImpl multiLeaderServerImpl, IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> iClientManager) {
        this.impl = multiLeaderServerImpl;
        this.clientManager = iClientManager;
        this.threads = (List) multiLeaderServerImpl.getConfiguration().stream().filter(peer -> {
            return !Objects.equals(peer, multiLeaderServerImpl.getThisNode());
        }).map(peer2 -> {
            return new LogDispatcherThread(peer2, multiLeaderServerImpl.getConfig());
        }).collect(Collectors.toList());
        if (this.threads.isEmpty()) {
            return;
        }
        this.executorService = IoTDBThreadPoolFactory.newFixedThreadPool(this.threads.size(), "LogDispatcher-" + multiLeaderServerImpl.getThisNode().getGroupId());
    }

    public void start() {
        if (this.threads.isEmpty()) {
            return;
        }
        List<LogDispatcherThread> list = this.threads;
        ExecutorService executorService = this.executorService;
        executorService.getClass();
        list.forEach((v1) -> {
            r1.submit(v1);
        });
    }

    public void stop() {
        if (this.threads.isEmpty()) {
            return;
        }
        this.threads.forEach((v0) -> {
            v0.stop();
        });
        this.executorService.shutdownNow();
        try {
            if (!this.executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                this.logger.error("Unable to shutdown LogDispatcher service after {} seconds", 10);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.error("Unexpected Interruption when closing LogDispatcher service ");
        }
    }

    public OptionalLong getMinSyncIndex() {
        return this.threads.stream().mapToLong((v0) -> {
            return v0.getCurrentSyncIndex();
        }).min();
    }

    public void offer(IndexedConsensusRequest indexedConsensusRequest) {
        this.threads.forEach(logDispatcherThread -> {
            this.logger.debug("{}: Push a log to the queue, where the queue length is {}", this.impl.getThisNode().getGroupId(), Integer.valueOf(logDispatcherThread.getPendingRequest().size()));
            if (logDispatcherThread.getPendingRequest().offer(indexedConsensusRequest)) {
                return;
            }
            this.logger.debug("{}: Log queue of {} is full, ignore the log to this node", this.impl.getThisNode().getGroupId(), logDispatcherThread.getPeer());
        });
    }
}
