package org.apache.ratis.server.impl;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.LongStream;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/ratis/server/impl/StateMachineUpdater.class */
public class StateMachineUpdater implements Runnable {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) StateMachineUpdater.class);
    private final String name;
    private final StateMachine stateMachine;
    private final RaftServerImpl server;
    private final RaftLog raftLog;
    private final Long autoSnapshotThreshold;
    private final boolean purgeUptoSnapshotIndex;
    private final Thread updater;
    private final AwaitForSignal awaitForSignal;
    private final RaftLogIndex appliedIndex;
    private final RaftLogIndex snapshotIndex;
    private final SnapshotRetentionPolicy snapshotRetentionPolicy;
    private final AtomicReference<Long> stopIndex = new AtomicReference<>();
    private volatile State state = State.RUNNING;
    private StateMachineMetrics stateMachineMetrics = null;
    private final Consumer<Object> infoIndexChange = obj -> {
        LOG.info("{}: {}", this.name, obj);
    };
    private final Consumer<Object> debugIndexChange = obj -> {
        LOG.debug("{}: {}", this.name, obj);
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/server/impl/StateMachineUpdater$State.class */
    public enum State {
        RUNNING,
        STOP,
        RELOAD,
        EXCEPTION
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateMachineUpdater(StateMachine stateMachine, RaftServerImpl raftServerImpl, ServerState serverState, long j, RaftProperties raftProperties) {
        this.name = serverState.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
        this.stateMachine = stateMachine;
        this.server = raftServerImpl;
        this.raftLog = serverState.getLog();
        this.appliedIndex = new RaftLogIndex(StateMachineMetrics.STATEMACHINE_APPLIED_INDEX_GAUGE, j);
        this.snapshotIndex = new RaftLogIndex("snapshotIndex", j);
        this.autoSnapshotThreshold = RaftServerConfigKeys.Snapshot.autoTriggerEnabled(raftProperties) ? Long.valueOf(RaftServerConfigKeys.Snapshot.autoTriggerThreshold(raftProperties)) : null;
        final int retentionFileNum = RaftServerConfigKeys.Snapshot.retentionFileNum(raftProperties);
        this.snapshotRetentionPolicy = new SnapshotRetentionPolicy() { // from class: org.apache.ratis.server.impl.StateMachineUpdater.1
            @Override // org.apache.ratis.statemachine.SnapshotRetentionPolicy
            public int getNumSnapshotsRetained() {
                return retentionFileNum;
            }
        };
        this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(raftProperties);
        this.updater = new Daemon(this);
        this.awaitForSignal = new AwaitForSignal(this.name);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        initializeMetrics();
        this.updater.start();
    }

    private void initializeMetrics() {
        if (this.stateMachineMetrics == null) {
            this.stateMachineMetrics = StateMachineMetrics.getStateMachineMetrics(this.server, this.appliedIndex, this.stateMachine);
        }
    }

    private void stop() {
        this.state = State.STOP;
        try {
            this.stateMachine.close();
            this.stateMachineMetrics.unregister();
        } catch (Throwable th) {
            LOG.warn(this.name + ": Failed to close " + JavaUtils.getClassSimpleName(this.stateMachine.getClass()) + " " + this.stateMachine, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopAndJoin() throws InterruptedException {
        if (this.state == State.EXCEPTION) {
            stop();
            return;
        }
        if (this.stopIndex.compareAndSet(null, Long.valueOf(this.raftLog.getLastCommittedIndex()))) {
            notifyUpdater();
            LOG.info("{}: set stopIndex = {}", this, this.stopIndex);
        }
        this.updater.join();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reloadStateMachine() {
        this.state = State.RELOAD;
        notifyUpdater();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyUpdater() {
        this.awaitForSignal.signal();
    }

    public String toString() {
        return this.name;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.state != State.STOP) {
            try {
                waitForCommit();
                if (this.state == State.RELOAD) {
                    reload();
                }
                MemoizedSupplier<List<CompletableFuture<Message>>> applyLog = applyLog();
                checkAndTakeSnapshot(applyLog);
                if (shouldStop()) {
                    checkAndTakeSnapshot(applyLog);
                    stop();
                }
            } catch (Throwable th) {
                if ((th instanceof InterruptedException) && this.state == State.STOP) {
                    LOG.info("{} was interrupted.  Exiting ...", this);
                } else {
                    this.state = State.EXCEPTION;
                    LOG.error(this + " caught a Throwable.", th);
                    this.server.close();
                }
            }
        }
    }

    private void waitForCommit() throws InterruptedException {
        long lastAppliedIndex = getLastAppliedIndex();
        while (lastAppliedIndex >= this.raftLog.getLastCommittedIndex() && this.state == State.RUNNING && !shouldStop() && !this.awaitForSignal.await(100L, TimeUnit.MILLISECONDS)) {
        }
    }

    private void reload() throws IOException {
        Preconditions.assertTrue(this.stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED);
        this.stateMachine.reinitialize();
        SnapshotInfo latestSnapshot = this.stateMachine.getLatestSnapshot();
        Objects.requireNonNull(latestSnapshot, "snapshot == null");
        long index = latestSnapshot.getIndex();
        this.snapshotIndex.setUnconditionally(index, this.infoIndexChange);
        this.appliedIndex.setUnconditionally(index, this.infoIndexChange);
        this.state = State.RUNNING;
    }

    private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws RaftLogIOException {
        MemoizedSupplier<List<CompletableFuture<Message>>> valueOf = MemoizedSupplier.valueOf(ArrayList::new);
        long lastCommittedIndex = this.raftLog.getLastCommittedIndex();
        while (true) {
            long lastAppliedIndex = getLastAppliedIndex();
            if (lastAppliedIndex >= lastCommittedIndex || this.state != State.RUNNING || shouldStop()) {
                break;
            }
            long j = lastAppliedIndex + 1;
            RaftProtos.LogEntryProto logEntryProto = this.raftLog.get(j);
            if (logEntryProto == null) {
                LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", this, Long.valueOf(j), this.state);
                break;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("{}: applying nextIndex={}, nextLog={}", this, Long.valueOf(j), LogProtoUtils.toLogEntryString(logEntryProto));
            } else {
                LOG.debug("{}: applying nextIndex={}", this, Long.valueOf(j));
            }
            CompletableFuture<Message> applyLogToStateMachine = this.server.applyLogToStateMachine(logEntryProto);
            if (applyLogToStateMachine != null) {
                valueOf.get().add(applyLogToStateMachine);
            }
            Preconditions.assertTrue(this.appliedIndex.incrementAndGet(this.debugIndexChange) == j);
        }
        return valueOf;
    }

    private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> memoizedSupplier) throws ExecutionException, InterruptedException {
        if (shouldTakeSnapshot()) {
            if (memoizedSupplier.isInitialized()) {
                JavaUtils.allOf(memoizedSupplier.get()).get();
            }
            takeSnapshot();
        }
    }

    private void takeSnapshot() {
        long orElse;
        try {
            Timer.Context time = this.stateMachineMetrics.getTakeSnapshotTimer().time();
            long takeSnapshot = this.stateMachine.takeSnapshot();
            time.stop();
            this.server.getSnapshotRequestHandler().completeTakingSnapshot(takeSnapshot);
            long lastAppliedIndex = getLastAppliedIndex();
            if (takeSnapshot > lastAppliedIndex) {
                throw new StateMachineException("Bug in StateMachine: snapshot index = " + takeSnapshot + " > appliedIndex = " + lastAppliedIndex + "; StateMachine class=" + this.stateMachine.getClass().getName() + ", stateMachine=" + this.stateMachine);
            }
            this.stateMachine.getStateMachineStorage().cleanupOldSnapshots(this.snapshotRetentionPolicy);
            if (takeSnapshot >= 0) {
                LOG.info("{}: Took a snapshot at index {}", this.name, Long.valueOf(takeSnapshot));
                this.snapshotIndex.updateIncreasingly(takeSnapshot, this.infoIndexChange);
                if (this.purgeUptoSnapshotIndex) {
                    orElse = takeSnapshot;
                } else {
                    orElse = LongStream.concat(LongStream.of(takeSnapshot), this.server.getCommitInfos().stream().mapToLong((v0) -> {
                        return v0.getCommitIndex();
                    })).min().orElse(takeSnapshot);
                }
                this.raftLog.purge(orElse);
            }
        } catch (IOException e) {
            LOG.error(this.name + ": Failed to take snapshot", (Throwable) e);
        }
    }

    private boolean shouldStop() {
        return Optional.ofNullable(this.stopIndex.get()).filter(l -> {
            return l.longValue() <= getLastAppliedIndex();
        }).isPresent();
    }

    private boolean shouldTakeSnapshot() {
        if (this.state == State.RUNNING && this.server.getSnapshotRequestHandler().shouldTriggerTakingSnapshot()) {
            return true;
        }
        if (this.autoSnapshotThreshold == null) {
            return false;
        }
        return shouldStop() ? getLastAppliedIndex() - this.snapshotIndex.get() > 0 : this.state == State.RUNNING && getStateMachineLastAppliedIndex() - this.snapshotIndex.get() >= this.autoSnapshotThreshold.longValue();
    }

    private long getLastAppliedIndex() {
        return this.appliedIndex.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getStateMachineLastAppliedIndex() {
        return this.stateMachine.getLastAppliedTermIndex().getIndex();
    }
}
