package org.apache.iotdb.consensus.iot.client;

import java.util.concurrent.CompletableFuture;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/iot/client/DispatchLogHandler.class */
public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRes> {
    private final LogDispatcher.LogDispatcherThread thread;
    private final Batch batch;
    private int retryCount;
    private final Logger logger = LoggerFactory.getLogger(DispatchLogHandler.class);
    private final long createTime = System.currentTimeMillis();

    public DispatchLogHandler(LogDispatcher.LogDispatcherThread logDispatcherThread, Batch batch) {
        this.thread = logDispatcherThread;
        this.batch = batch;
    }

    public void onComplete(TSyncLogEntriesRes tSyncLogEntriesRes) {
        if (tSyncLogEntriesRes.getStatuses().size() == 1 && needRetry(((TSStatus) tSyncLogEntriesRes.getStatuses().get(0)).getCode())) {
            Logger logger = this.logger;
            int i = this.retryCount + 1;
            this.retryCount = i;
            logger.warn("Can not send {} to peer {} for {} times because {}", new Object[]{this.batch, this.thread.getPeer(), Integer.valueOf(i), ((TSStatus) tSyncLogEntriesRes.getStatuses().get(0)).getMessage()});
            sleepCorrespondingTimeAndRetryAsynchronous();
        } else {
            this.thread.getSyncStatus().removeBatch(this.batch);
            this.thread.updateSafelyDeletedSearchIndex();
        }
        MetricService.getInstance().getOrCreateHistogram(Metric.STAGE.toString(), MetricLevel.IMPORTANT, new String[]{Tag.NAME.toString(), Metric.IOT_CONSENSUS.toString(), Tag.TYPE.toString(), "syncLogTimePerRequest", Tag.REGION.toString(), this.thread.getPeer().getGroupId().toString()}).update((System.currentTimeMillis() - this.createTime) / this.batch.getLogEntries().size());
    }

    private boolean needRetry(int i) {
        return i == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode() || i == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() || i == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
    }

    public void onError(Exception exc) {
        Logger logger = this.logger;
        int i = this.retryCount + 1;
        this.retryCount = i;
        logger.warn("Can not send {} to peer for {} times {} because {}", new Object[]{this.batch, this.thread.getPeer(), Integer.valueOf(i), exc});
        sleepCorrespondingTimeAndRetryAsynchronous();
    }

    private void sleepCorrespondingTimeAndRetryAsynchronous() {
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(Math.min((long) (this.thread.getConfig().getReplication().getBasicRetryWaitTimeMs() * Math.pow(2.0d, this.retryCount)), this.thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.warn("Unexpected interruption during retry pending batch");
            }
            if (this.thread.isStopped()) {
                this.logger.debug("LogDispatcherThread {} has been stopped, we will not retrying this Batch {} after {} times", new Object[]{this.thread.getPeer(), this.batch, Integer.valueOf(this.retryCount)});
            } else {
                this.thread.sendBatchAsync(this.batch, this);
            }
        });
    }
}
