package org.apache.iotdb.consensus.iot.logdispatcher;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesReq;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.class */
public class LogDispatcher {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) LogDispatcher.class);
    private static final long DEFAULT_INITIAL_SYNC_INDEX = 0;
    private final IoTConsensusServerImpl impl;
    private final List<LogDispatcherThread> threads;
    private final String selfPeerId;
    private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
    private ExecutorService executorService;
    private boolean stopped = false;
    private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
    private final AtomicLong logEntriesFromQueue = new AtomicLong(0);

    /* loaded from: input_file:org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher$LogDispatcherThread.class */
    public class LogDispatcherThread implements Runnable {
        private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
        private static final long START_INDEX = 1;
        private final IoTConsensusConfig config;
        private final Peer peer;
        private final IndexController controller;
        private final SyncStatus syncStatus;
        private final BlockingQueue<IndexedConsensusRequest> pendingEntries;
        private final ConsensusReqReader reader;
        private final ConsensusReqReader.ReqIterator walEntryIterator;
        private final LogDispatcherThreadMetrics metrics;
        private final List<IndexedConsensusRequest> bufferedEntries = new LinkedList();
        private final IoTConsensusMemoryManager iotConsensusMemoryManager = IoTConsensusMemoryManager.getInstance();
        private volatile boolean stopped = false;

        public LogDispatcherThread(Peer peer, IoTConsensusConfig ioTConsensusConfig, long j) {
            this.reader = (ConsensusReqReader) LogDispatcher.this.impl.getStateMachine().read(new GetConsensusReqReaderPlan());
            this.peer = peer;
            this.config = ioTConsensusConfig;
            this.pendingEntries = new ArrayBlockingQueue(ioTConsensusConfig.getReplication().getMaxQueueLength());
            this.controller = new IndexController(LogDispatcher.this.impl.getStorageDir(), peer, j, ioTConsensusConfig.getReplication().getCheckpointGap());
            IndexController indexController = this.controller;
            IoTConsensusServerImpl ioTConsensusServerImpl = LogDispatcher.this.impl;
            Objects.requireNonNull(ioTConsensusServerImpl);
            this.syncStatus = new SyncStatus(indexController, ioTConsensusConfig, ioTConsensusServerImpl::getSearchIndex);
            this.walEntryIterator = this.reader.getReqIterator(START_INDEX);
            this.metrics = new LogDispatcherThreadMetrics(this);
        }

        public IndexController getController() {
            return this.controller;
        }

        public long getCurrentSyncIndex() {
            return this.controller.getCurrentIndex();
        }

        public Peer getPeer() {
            return this.peer;
        }

        public IoTConsensusConfig getConfig() {
            return this.config;
        }

        public int getPendingEntriesSize() {
            return this.pendingEntries.size();
        }

        public int getBufferRequestSize() {
            return this.bufferedEntries.size();
        }

        public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
            if (!this.iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
                return false;
            }
            try {
                boolean offer = this.pendingEntries.offer(indexedConsensusRequest);
                if (!offer) {
                }
                return offer;
            } finally {
                this.iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
            }
        }

        private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
            this.iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
        }

        public void stop() {
            this.stopped = true;
            long j = 0;
            Iterator it = this.pendingEntries.iterator();
            while (it.hasNext()) {
                j += ((IndexedConsensusRequest) it.next()).getSerializedSize();
            }
            this.pendingEntries.clear();
            this.iotConsensusMemoryManager.free(j, true);
            long j2 = 0;
            Iterator<IndexedConsensusRequest> it2 = this.bufferedEntries.iterator();
            while (it2.hasNext()) {
                j2 += it2.next().getSerializedSize();
            }
            this.iotConsensusMemoryManager.free(j2, true);
            this.syncStatus.free();
            MetricService.getInstance().removeMetricSet(this.metrics);
        }

        public void cleanup() throws IOException {
            this.controller.cleanupVersionFiles();
        }

        public boolean isStopped() {
            return this.stopped;
        }

        @Override // java.lang.Runnable
        public void run() {
            Batch batch;
            LogDispatcher.logger.info("{}: Dispatcher for {} starts", LogDispatcher.this.impl.getThisNode(), this.peer);
            MetricService.getInstance().addMetricSet(this.metrics);
            while (!Thread.interrupted() && !this.stopped) {
                try {
                    long nanoTime = System.nanoTime();
                    while (true) {
                        batch = getBatch();
                        if (batch.isEmpty()) {
                            IndexedConsensusRequest poll = this.pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
                            if (poll != null) {
                                this.bufferedEntries.add(poll);
                            }
                        }
                    }
                    MetricService.getInstance().getOrCreateHistogram(Metric.STAGE.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), Metric.IOT_CONSENSUS.toString(), Tag.TYPE.toString(), "constructBatch", Tag.REGION.toString(), this.peer.getGroupId().toString()).update((System.nanoTime() - nanoTime) / batch.getLogEntries().size());
                    this.syncStatus.addNextBatch(batch);
                    LogDispatcher.this.logEntriesFromWAL.addAndGet(batch.getLogEntriesNumFromWAL());
                    LogDispatcher.this.logEntriesFromQueue.addAndGet(batch.getLogEntries().size() - batch.getLogEntriesNumFromWAL());
                    sendBatchAsync(batch, new DispatchLogHandler(this, batch));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    LogDispatcher.logger.error("Unexpected error in logDispatcher for peer {}", this.peer, e2);
                }
            }
            LogDispatcher.logger.info("{}: Dispatcher for {} exits", LogDispatcher.this.impl.getThisNode(), this.peer);
        }

        public void updateSafelyDeletedSearchIndex() {
            this.reader.setSafelyDeletedSearchIndex(LogDispatcher.this.impl.getCurrentSafelyDeletedSearchIndex());
            if (LogDispatcher.this.impl.unblockWrite()) {
                LogDispatcher.this.impl.signal();
            }
        }

        public Batch getBatch() {
            long searchIndex;
            long nextSendingIndex = this.syncStatus.getNextSendingIndex();
            synchronized (LogDispatcher.this.impl.getIndexObject()) {
                searchIndex = LogDispatcher.this.impl.getSearchIndex() + START_INDEX;
                LogDispatcher.logger.debug("{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}", LogDispatcher.this.impl.getThisNode().getGroupId(), Long.valueOf(nextSendingIndex), Long.valueOf(searchIndex), Integer.valueOf(getPendingEntriesSize()), Integer.valueOf(this.bufferedEntries.size()));
                this.pendingEntries.drainTo(this.bufferedEntries, this.config.getReplication().getMaxLogEntriesNumPerBatch() - this.bufferedEntries.size());
            }
            Iterator<IndexedConsensusRequest> it = this.bufferedEntries.iterator();
            while (it.hasNext()) {
                IndexedConsensusRequest next = it.next();
                if (next.getSearchIndex() >= nextSendingIndex) {
                    break;
                }
                it.remove();
                releaseReservedMemory(next);
            }
            Batch batch = new Batch(this.config);
            if (this.bufferedEntries.isEmpty()) {
                constructBatchFromWAL(nextSendingIndex, searchIndex, batch);
                batch.buildIndex();
                LogDispatcher.logger.debug("{} : accumulated a {} from wal when empty", LogDispatcher.this.impl.getThisNode().getGroupId(), batch);
            } else {
                Iterator<IndexedConsensusRequest> it2 = this.bufferedEntries.iterator();
                IndexedConsensusRequest next2 = it2.next();
                if (nextSendingIndex != next2.getSearchIndex()) {
                    constructBatchFromWAL(nextSendingIndex, next2.getSearchIndex(), batch);
                    if (!batch.canAccumulate()) {
                        batch.buildIndex();
                        LogDispatcher.logger.debug("{} : accumulated a {} from wal", LogDispatcher.this.impl.getThisNode().getGroupId(), batch);
                        return batch;
                    }
                }
                constructBatchIndexedFromConsensusRequest(next2, batch);
                it2.remove();
                releaseReservedMemory(next2);
                if (!batch.canAccumulate()) {
                    batch.buildIndex();
                    LogDispatcher.logger.debug("{} : accumulated a {} from queue", LogDispatcher.this.impl.getThisNode().getGroupId(), batch);
                    return batch;
                }
                while (it2.hasNext() && batch.canAccumulate()) {
                    IndexedConsensusRequest next3 = it2.next();
                    if (next3.getSearchIndex() != next2.getSearchIndex() + START_INDEX) {
                        constructBatchFromWAL(next2.getSearchIndex() + START_INDEX, next3.getSearchIndex(), batch);
                        if (!batch.canAccumulate()) {
                            batch.buildIndex();
                            LogDispatcher.logger.debug("gap {} : accumulated a {} from queue and wal when gap", LogDispatcher.this.impl.getThisNode().getGroupId(), batch);
                            return batch;
                        }
                    }
                    constructBatchIndexedFromConsensusRequest(next3, batch);
                    next2 = next3;
                    it2.remove();
                    releaseReservedMemory(next3);
                }
                batch.buildIndex();
                LogDispatcher.logger.debug("{} : accumulated a {} from queue and wal", LogDispatcher.this.impl.getThisNode().getGroupId(), batch);
            }
            return batch;
        }

        public void sendBatchAsync(Batch batch, DispatchLogHandler dispatchLogHandler) {
            try {
                AsyncIoTConsensusServiceClient asyncIoTConsensusServiceClient = (AsyncIoTConsensusServiceClient) LogDispatcher.this.clientManager.borrowClient(this.peer.getEndpoint());
                TSyncLogEntriesReq tSyncLogEntriesReq = new TSyncLogEntriesReq(LogDispatcher.this.selfPeerId, this.peer.getGroupId().convertToTConsensusGroupId(), batch.getLogEntries());
                LogDispatcher.logger.debug("Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}", Long.valueOf(batch.getStartIndex()), Long.valueOf(batch.getEndIndex()), this.peer.getGroupId().convertToTConsensusGroupId());
                asyncIoTConsensusServiceClient.syncLogEntries(tSyncLogEntriesReq, dispatchLogHandler);
            } catch (Exception e) {
                LogDispatcher.logger.error("Can not sync logs to peer {} because", this.peer, e);
                dispatchLogHandler.onError(e);
            }
        }

        public SyncStatus getSyncStatus() {
            return this.syncStatus;
        }

        private void constructBatchFromWAL(long j, long j2, Batch batch) {
            LogDispatcher.logger.debug(String.format("DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d", Integer.valueOf(this.peer.getGroupId().getId()), this.peer.getEndpoint().getIp(), Long.valueOf(j), Long.valueOf(j2)));
            long j3 = j;
            this.walEntryIterator.skipTo(j3);
            while (j3 < j2 && batch.canAccumulate()) {
                LogDispatcher.logger.debug("construct from WAL for one Entry, index : {}", Long.valueOf(j3));
                try {
                    this.walEntryIterator.waitForNextReady();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LogDispatcher.logger.warn("wait for next WAL entry is interrupted");
                }
                IndexedConsensusRequest next = this.walEntryIterator.next();
                if (next.getSearchIndex() < j3) {
                    LogDispatcher.logger.warn("search for one Entry which index is {}, but find a smaller one, index : {}", Long.valueOf(j3), Long.valueOf(next.getSearchIndex()));
                } else {
                    if (next.getSearchIndex() > j3) {
                        LogDispatcher.logger.warn("search for one Entry which index is {}, but find a larger one, index : {}", Long.valueOf(j3), Long.valueOf(next.getSearchIndex()));
                        if (next.getSearchIndex() >= j2) {
                            return;
                        }
                    }
                    j3 = next.getSearchIndex() + START_INDEX;
                    next.buildSerializedRequests();
                    batch.addTLogEntry(new TLogEntry(next.getSerializedRequests(), next.getSearchIndex(), true));
                }
            }
        }

        private void constructBatchIndexedFromConsensusRequest(IndexedConsensusRequest indexedConsensusRequest, Batch batch) {
            batch.addTLogEntry(new TLogEntry(indexedConsensusRequest.getSerializedRequests(), indexedConsensusRequest.getSearchIndex(), false));
        }
    }

    public LogDispatcher(IoTConsensusServerImpl ioTConsensusServerImpl, IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> iClientManager) {
        this.impl = ioTConsensusServerImpl;
        this.selfPeerId = ioTConsensusServerImpl.getThisNode().getEndpoint().toString();
        this.clientManager = iClientManager;
        this.threads = (List) ioTConsensusServerImpl.getConfiguration().stream().filter(peer -> {
            return !Objects.equals(peer, ioTConsensusServerImpl.getThisNode());
        }).map(peer2 -> {
            return new LogDispatcherThread(peer2, ioTConsensusServerImpl.getConfig(), 0L);
        }).collect(Collectors.toList());
        if (this.threads.isEmpty()) {
            return;
        }
        initLogSyncThreadPool();
    }

    private void initLogSyncThreadPool() {
        this.executorService = IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + this.impl.getThisNode().getGroupId());
    }

    public synchronized void start() {
        if (this.threads.isEmpty()) {
            return;
        }
        List<LogDispatcherThread> list = this.threads;
        ExecutorService executorService = this.executorService;
        Objects.requireNonNull(executorService);
        list.forEach((v1) -> {
            r1.submit(v1);
        });
    }

    public synchronized void stop() {
        if (!this.threads.isEmpty()) {
            this.threads.forEach((v0) -> {
                v0.stop();
            });
            this.executorService.shutdownNow();
            try {
                if (!this.executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                    logger.error("Unable to shutdown LogDispatcher service after {} seconds", (Object) 10);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("Unexpected Interruption when closing LogDispatcher service ");
            }
        }
        this.stopped = true;
    }

    public synchronized void addLogDispatcherThread(Peer peer, long j) {
        if (this.stopped) {
            return;
        }
        LogDispatcherThread logDispatcherThread = new LogDispatcherThread(peer, this.impl.getConfig(), j);
        this.threads.add(logDispatcherThread);
        if (this.executorService == null) {
            initLogSyncThreadPool();
        }
        this.executorService.submit(logDispatcherThread);
    }

    public synchronized void removeLogDispatcherThread(Peer peer) throws IOException {
        if (this.stopped) {
            return;
        }
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= this.threads.size()) {
                break;
            }
            if (this.threads.get(i2).peer.equals(peer)) {
                i = i2;
                break;
            }
            i2++;
        }
        if (i == -1) {
            return;
        }
        this.threads.get(i).stop();
        this.threads.get(i).cleanup();
        this.threads.remove(i);
    }

    public synchronized OptionalLong getMinSyncIndex() {
        return this.threads.stream().mapToLong((v0) -> {
            return v0.getCurrentSyncIndex();
        }).min();
    }

    public void offer(IndexedConsensusRequest indexedConsensusRequest) {
        if (this.threads.isEmpty()) {
            return;
        }
        indexedConsensusRequest.buildSerializedRequests();
        synchronized (this) {
            this.threads.forEach(logDispatcherThread -> {
                logger.debug("{}->{}: Push a log to the queue, where the queue length is {}", this.impl.getThisNode().getGroupId(), logDispatcherThread.getPeer().getEndpoint().getIp(), Integer.valueOf(logDispatcherThread.getPendingEntriesSize()));
                if (logDispatcherThread.offer(indexedConsensusRequest)) {
                    return;
                }
                logger.debug("{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", this.impl.getThisNode().getGroupId(), logDispatcherThread.getPeer(), Long.valueOf(indexedConsensusRequest.getSearchIndex()));
            });
        }
    }

    public long getLogEntriesFromWAL() {
        return this.logEntriesFromWAL.get();
    }

    public long getLogEntriesFromQueue() {
        return this.logEntriesFromQueue.get();
    }
}
