/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.raft.roles;

import com.google.common.base.Throwables;
import io.atomix.cluster.MemberId;
import io.atomix.raft.RaftError;
import io.atomix.raft.RaftException;
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.cluster.impl.RaftMemberContext;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.protocol.AppendResponse;
import io.atomix.raft.protocol.ConfigureRequest;
import io.atomix.raft.protocol.ConfigureResponse;
import io.atomix.raft.protocol.ForceConfigureRequest;
import io.atomix.raft.protocol.ForceConfigureResponse;
import io.atomix.raft.protocol.InternalAppendRequest;
import io.atomix.raft.protocol.JoinRequest;
import io.atomix.raft.protocol.JoinResponse;
import io.atomix.raft.protocol.LeaveRequest;
import io.atomix.raft.protocol.LeaveResponse;
import io.atomix.raft.protocol.PollRequest;
import io.atomix.raft.protocol.PollResponse;
import io.atomix.raft.protocol.RaftResponse;
import io.atomix.raft.protocol.ReconfigureRequest;
import io.atomix.raft.protocol.ReconfigureResponse;
import io.atomix.raft.protocol.TransferRequest;
import io.atomix.raft.protocol.TransferResponse;
import io.atomix.raft.protocol.VoteRequest;
import io.atomix.raft.protocol.VoteResponse;
import io.atomix.raft.roles.ActiveRole;
import io.atomix.raft.roles.LeaderAppender;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.atomix.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.raft.storage.log.entry.InitialEntry;
import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.raft.storage.log.entry.SerializedApplicationEntry;
import io.atomix.raft.storage.log.entry.UnserializedApplicationEntry;
import io.atomix.raft.storage.system.Configuration;
import io.atomix.raft.zeebe.EntryValidator;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.concurrent.Scheduled;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

public final class LeaderRole
extends ActiveRole
implements ZeebeLogAppender {
    private static final int MAX_APPEND_ATTEMPTS = 5;
    private final LeaderAppender appender = new LeaderAppender(this);
    private Scheduled appendTimer;
    private long configuring;
    private CompletableFuture<Void> commitInitialEntriesFuture;
    private ApplicationEntry lastZbEntry = null;

    public LeaderRole(RaftContext context) {
        super(context);
    }

    @Override
    public synchronized CompletableFuture<RaftRole> start() {
        this.raft.getRaftRoleMetrics().setElectionLatency(System.currentTimeMillis() - this.raft.getLastHeartbeat());
        this.takeLeadership();
        this.appendInitialEntries();
        this.commitInitialEntriesFuture = this.commitInitialEntries();
        this.lastZbEntry = this.findLastZeebeEntry();
        if (this.jointConsensus() || this.forcedConfiguration()) {
            this.raft.getThreadContext().execute(() -> {
                Collection<RaftMember> currentMembers = this.raft.getCluster().getConfiguration().newMembers();
                this.configure(currentMembers, List.of());
            });
        }
        return ((CompletableFuture)super.start().thenRun(this::startTimers)).thenApply(v -> this);
    }

    @Override
    public synchronized CompletableFuture<Void> stop() {
        this.raft.resetLastHeartbeat();
        this.raft.getCluster().getReplicationTargets().forEach(RaftMemberContext::closeReplicationContext);
        return ((CompletableFuture)((CompletableFuture)super.stop().thenRun(this.appender::close)).thenRun(this::cancelTimers)).thenRun(this::stepDown);
    }

    @Override
    public RaftServer.Role role() {
        return RaftServer.Role.LEADER;
    }

    @Override
    public CompletableFuture<ReconfigureResponse> onReconfigure(ReconfigureRequest request) {
        Collection<RaftMember> updatedMembers;
        this.raft.checkThread();
        this.logRequest(request);
        if (this.initializing()) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.CONFIGURATION_ERROR, "Not ready to make configuration changes")).build()));
        }
        if (this.configuring() || this.jointConsensus()) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.CONFIGURATION_ERROR, "Another configuration change is in progress")).build()));
        }
        Configuration configuration = this.raft.getCluster().getConfiguration();
        if (request.index() > 0L && request.index() < configuration.index() || request.term() != configuration.term()) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.CONFIGURATION_ERROR, "Stale configuration")).build()));
        }
        Collection<RaftMember> currentMembers = this.raft.getCluster().getMembers();
        if (this.equalMembership(currentMembers, updatedMembers = request.members())) {
            return CompletableFuture.completedFuture(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.OK)).withIndex(configuration.index())).withTerm(configuration.term())).withTime(configuration.time())).withMembers(currentMembers)).build()));
        }
        CompletableFuture<ReconfigureResponse> future = new CompletableFuture<ReconfigureResponse>();
        this.configure(updatedMembers, currentMembers).whenComplete((jointConsensusIndex, jointConsensusError) -> {
            if (jointConsensusError == null) {
                this.configure(updatedMembers, List.of()).whenComplete((leftJointConsensusIndex, leftJointConsensusError) -> {
                    if (leftJointConsensusError == null) {
                        future.complete(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.OK)).withIndex((long)leftJointConsensusIndex)).withTerm(configuration.term())).withTime(configuration.time())).withMembers(updatedMembers)).build()));
                    } else {
                        future.complete(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR, leftJointConsensusError.getMessage())).build()));
                    }
                });
            } else {
                future.complete(this.logResponse(((ReconfigureResponse.Builder)((ReconfigureResponse.Builder)ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR, jointConsensusError.getMessage())).build()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<ForceConfigureResponse> onForceConfigure(ForceConfigureRequest request) {
        this.raft.transition(RaftServer.Role.FOLLOWER);
        return super.onForceConfigure(request);
    }

    @Override
    public CompletableFuture<JoinResponse> onJoin(JoinRequest request) {
        this.raft.checkThread();
        Configuration currentConfiguration = this.raft.getCluster().getConfiguration();
        return this.onReconfigure(ReconfigureRequest.builder().withIndex(currentConfiguration.index()).withTerm(currentConfiguration.term()).withMembers(currentConfiguration.newMembers()).withMember(request.joiningMember()).from((String)((Object)this.raft.getCluster().getLocalMember().memberId().id())).build()).handle((reconfigureResponse, throwable) -> {
            if (throwable != null) {
                return ((JoinResponse.Builder)((JoinResponse.Builder)JoinResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR, throwable.getMessage())).build();
            }
            if (reconfigureResponse.status() == RaftResponse.Status.OK) {
                return ((JoinResponse.Builder)JoinResponse.builder().withStatus(RaftResponse.Status.OK)).build();
            }
            return ((JoinResponse.Builder)((JoinResponse.Builder)JoinResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(reconfigureResponse.error())).build();
        });
    }

    @Override
    public CompletableFuture<LeaveResponse> onLeave(LeaveRequest request) {
        this.raft.checkThread();
        Configuration currentConfiguration = this.raft.getCluster().getConfiguration();
        List<RaftMember> updatedMembers = currentConfiguration.newMembers().stream().filter(member -> !member.memberId().equals(request.leavingMember().memberId())).toList();
        return this.onReconfigure(ReconfigureRequest.builder().withIndex(currentConfiguration.index()).withTerm(currentConfiguration.term()).withMembers(updatedMembers).from((String)((Object)this.raft.getCluster().getLocalMember().memberId().id())).build()).handle((reconfigureResponse, throwable) -> {
            if (throwable != null) {
                return ((LeaveResponse.Builder)((LeaveResponse.Builder)LeaveResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR, throwable.getMessage())).build();
            }
            if (reconfigureResponse.status() == RaftResponse.Status.OK) {
                return ((LeaveResponse.Builder)LeaveResponse.builder().withStatus(RaftResponse.Status.OK)).build();
            }
            return ((LeaveResponse.Builder)((LeaveResponse.Builder)LeaveResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(reconfigureResponse.error())).build();
        });
    }

    private boolean equalMembership(Collection<RaftMember> currentMembers, Collection<RaftMember> updatedMembers) {
        record MemberIdAndType(MemberId memberId, RaftMember.Type type) {
        }
        Set currentMembersWithTypes = currentMembers.stream().map(member -> new MemberIdAndType(member.memberId(), member.getType())).collect(Collectors.toSet());
        Set updatedMembersWithTypes = updatedMembers.stream().map(member -> new MemberIdAndType(member.memberId(), member.getType())).collect(Collectors.toSet());
        return currentMembersWithTypes.equals(updatedMembersWithTypes);
    }

    private ApplicationEntry findLastZeebeEntry() {
        try (RaftLogReader reader = this.raft.getLog().openUncommittedReader();){
            IndexedRaftLogEntry lastEntry;
            reader.seekToAsqn(Long.MAX_VALUE);
            if (reader.hasNext() && (lastEntry = (IndexedRaftLogEntry)reader.next()) != null && lastEntry.isApplicationEntry()) {
                ApplicationEntry applicationEntry = lastEntry.getApplicationEntry();
                return applicationEntry;
            }
            ApplicationEntry applicationEntry = null;
            return applicationEntry;
        }
    }

    private void cancelTimers() {
        if (this.appendTimer != null) {
            this.log.trace("Cancelling append timer");
            this.appendTimer.cancel();
        }
    }

    private void stepDown() {
        if (this.raft.getLeader() != null && this.raft.getLeader().equals(this.raft.getCluster().getLocalMember())) {
            this.raft.setLeader(null);
        }
    }

    private void takeLeadership() {
        this.raft.setLeader(this.raft.getCluster().getLocalMember().memberId());
        this.raft.getCluster().reset();
        this.raft.getCluster().getReplicationTargets().forEach(member -> member.openReplicationContext(this.raft.getLog()));
    }

    private void appendInitialEntries() {
        long term = this.raft.getTerm();
        this.appendEntry(new RaftLogEntry(term, new InitialEntry()));
    }

    private CompletableFuture<Void> commitInitialEntries() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.appender.appendEntries(this.appender.getIndex()).whenComplete((resultIndex, error) -> {
            this.raft.checkThread();
            if (this.isRunning()) {
                if (error == null) {
                    future.complete(null);
                } else {
                    this.log.info("Failed to commit the initial entry, stepping down");
                    this.raft.setLeader(null);
                    this.raft.transition(RaftServer.Role.FOLLOWER);
                }
            }
        });
        return future;
    }

    private void startTimers() {
        this.log.trace("Starting append timer on fix rate of {}", (Object)this.raft.getHeartbeatInterval());
        this.appendTimer = this.raft.getThreadContext().schedule(Duration.ZERO, this.raft.getHeartbeatInterval(), this::appendMembers);
    }

    private void appendMembers() {
        this.raft.checkThread();
        if (this.isRunning()) {
            this.appender.appendEntries();
        }
    }

    private boolean configuring() {
        return this.configuring > 0L;
    }

    private boolean initializing() {
        return this.appender.getIndex() == 0L || this.raft.getCommitIndex() < this.appender.getIndex();
    }

    private boolean jointConsensus() {
        return this.raft.getCluster().inJointConsensus();
    }

    private boolean forcedConfiguration() {
        return this.raft.getCluster().getConfiguration().force();
    }

    private CompletableFuture<Long> configure(Collection<RaftMember> newMembers, Collection<RaftMember> oldMembers) {
        IndexedRaftLogEntry entry;
        this.raft.checkThread();
        long term = this.raft.getTerm();
        ConfigurationEntry configurationEntry = new ConfigurationEntry(System.currentTimeMillis(), newMembers, oldMembers);
        try {
            entry = this.appendEntry(new RaftLogEntry(term, configurationEntry));
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        this.configuring = entry.index();
        this.raft.getCluster().configure(new Configuration(entry.index(), entry.term(), configurationEntry.timestamp(), configurationEntry.newMembers(), configurationEntry.oldMembers()));
        return this.appender.appendEntries(entry.index()).whenCompleteAsync((index, error) -> {
            this.configuring = 0L;
        }, (Executor)this.raft.getThreadContext());
    }

    @Override
    public CompletableFuture<ConfigureResponse> onConfigure(ConfigureRequest request) {
        if (this.updateTermAndLeader(request.term(), request.leader())) {
            this.raft.transition(RaftServer.Role.FOLLOWER);
        }
        return super.onConfigure(request);
    }

    @Override
    public CompletableFuture<TransferResponse> onTransfer(TransferRequest request) {
        this.logRequest(request);
        if (!this.raft.getCluster().isMember(request.member())) {
            return CompletableFuture.completedFuture(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.ILLEGAL_MEMBER_STATE)).build()));
        }
        CompletableFuture<TransferResponse> future = new CompletableFuture<TransferResponse>();
        this.appender.appendEntries(this.raft.getLog().getLastIndex()).whenComplete((result, error) -> {
            if (this.isRunning()) {
                if (error == null) {
                    this.log.info("Transferring leadership to {}", (Object)request.member());
                    this.raft.transition(RaftServer.Role.FOLLOWER);
                    future.complete(this.logResponse(((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.OK)).build()));
                } else if (error instanceof CompletionException && error.getCause() instanceof RaftException) {
                    future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(((RaftException)error.getCause()).getType(), error.getMessage())).build()));
                } else if (error instanceof RaftException) {
                    future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(((RaftException)error).getType(), error.getMessage())).build()));
                } else {
                    future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.PROTOCOL_ERROR, error.getMessage())).build()));
                }
            } else {
                future.complete(this.logResponse(((TransferResponse.Builder)((TransferResponse.Builder)TransferResponse.builder().withStatus(RaftResponse.Status.ERROR)).withError(RaftError.Type.ILLEGAL_MEMBER_STATE)).build()));
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<AppendResponse> onAppend(InternalAppendRequest request) {
        this.raft.checkThread();
        if (this.updateTermAndLeader(request.term(), request.leader())) {
            CompletableFuture<AppendResponse> future = super.onAppend(request);
            this.raft.transition(RaftServer.Role.FOLLOWER);
            return future;
        }
        if (request.term() < this.raft.getTerm()) {
            this.logRequest(request);
            return CompletableFuture.completedFuture(this.logResponse(((AppendResponse.Builder)AppendResponse.builder().withStatus(RaftResponse.Status.OK)).withTerm(this.raft.getTerm()).withSucceeded(false).withLastLogIndex(this.raft.getLog().getLastIndex()).withLastSnapshotIndex(this.raft.getCurrentSnapshotIndex()).build()));
        }
        this.raft.setLeader(request.leader());
        this.raft.transition(RaftServer.Role.FOLLOWER);
        return super.onAppend(request);
    }

    @Override
    public CompletableFuture<PollResponse> onPoll(PollRequest request) {
        this.logRequest(request);
        RaftMemberContext member = this.raft.getCluster().getMemberContext(request.candidate());
        if (member != null) {
            member.resetFailureCount();
        }
        return CompletableFuture.completedFuture(this.logResponse(((PollResponse.Builder)PollResponse.builder().withStatus(RaftResponse.Status.OK)).withTerm(this.raft.getTerm()).withAccepted(false).build()));
    }

    @Override
    public CompletableFuture<VoteResponse> onVote(VoteRequest request) {
        if (this.updateTermAndLeader(request.term(), null)) {
            this.log.info("Received greater term from {}", (Object)request.candidate());
            this.raft.transition(RaftServer.Role.FOLLOWER);
            return super.onVote(request);
        }
        this.logRequest(request);
        return CompletableFuture.completedFuture(this.logResponse(((VoteResponse.Builder)VoteResponse.builder().withStatus(RaftResponse.Status.OK)).withTerm(this.raft.getTerm()).withVoted(false).build()));
    }

    private IndexedRaftLogEntry appendEntry(RaftLogEntry entry) {
        try {
            return this.appendWithRetry(entry);
        }
        catch (Exception e) {
            this.log.error("Failed to append to local log, stepping down", (Throwable)e);
            this.raft.transition(RaftServer.Role.FOLLOWER);
            throw e;
        }
    }

    private IndexedRaftLogEntry appendWithRetry(RaftLogEntry entry) {
        int retries = 0;
        Throwable lastError = null;
        while (retries <= 5) {
            try {
                return this.append(entry);
            }
            catch (JournalException.OutOfDiskSpace e) {
                if (!this.raft.getLogCompactor().compactIgnoringReplicationThreshold()) {
                    throw e;
                }
                lastError = e;
                this.log.warn("Out of disk space while appending entry {}, compacted and retrying... (try {} out of {})", new Object[]{entry, ++retries, 5, e});
            }
            catch (JournalException | UncheckedIOException e) {
                lastError = e;
                this.log.warn("Error on appending entry {}, retrying... (try {} out of {})", new Object[]{entry, ++retries, 5, e});
            }
        }
        this.log.warn("Failed to append to local log after {} retries", (Object)retries, (Object)lastError);
        throw lastError;
    }

    private IndexedRaftLogEntry append(RaftLogEntry entry) {
        IndexedRaftLogEntry indexedEntry = this.raft.getLog().append(entry);
        this.raft.getReplicationMetrics().setAppendIndex(indexedEntry.index());
        this.log.trace("Appended {}", (Object)indexedEntry);
        this.appender.observeNonCommittedEntries(this.raft.getCommitIndex());
        return indexedEntry;
    }

    @Override
    public void appendEntry(ApplicationEntry entry, ZeebeLogAppender.AppendListener appendListener) {
        this.raft.getThreadContext().execute(() -> this.safeAppendEntry(entry, appendListener));
    }

    @Override
    public void appendEntry(long lowestPosition, long highestPosition, ByteBuffer data, ZeebeLogAppender.AppendListener appendListener) {
        this.raft.getThreadContext().execute(() -> this.safeAppendEntry(new SerializedApplicationEntry(lowestPosition, highestPosition, data), appendListener));
    }

    @Override
    public void appendEntry(long lowestPosition, long highestPosition, BufferWriter data, ZeebeLogAppender.AppendListener appendListener) {
        this.raft.getThreadContext().execute(() -> this.safeAppendEntry(new UnserializedApplicationEntry(lowestPosition, highestPosition, data), appendListener));
    }

    private void safeAppendEntry(ApplicationEntry entry, ZeebeLogAppender.AppendListener appendListener) {
        IndexedRaftLogEntry indexed;
        this.raft.checkThread();
        if (!this.isRunning()) {
            appendListener.onWriteError(new RaftException.NoLeader("LeaderRole is closed and cannot be used as appender", new Object[0]));
            return;
        }
        EntryValidator.ValidationResult result = this.raft.getEntryValidator().validateEntry(this.lastZbEntry, entry);
        if (result.failed()) {
            appendListener.onWriteError(new IllegalStateException(result.errorMessage()));
            this.raft.transition(RaftServer.Role.FOLLOWER);
            return;
        }
        try {
            indexed = this.appendEntry(new RaftLogEntry(this.raft.getTerm(), entry));
        }
        catch (Exception e) {
            appendListener.onWriteError(Throwables.getRootCause((Throwable)e));
            return;
        }
        if (indexed.isApplicationEntry()) {
            this.lastZbEntry = indexed.getApplicationEntry();
        }
        appendListener.onWrite(indexed);
        this.replicate(indexed, appendListener);
    }

    private void replicate(IndexedRaftLogEntry indexed, ZeebeLogAppender.AppendListener appendListener) {
        long committedPosition;
        this.raft.checkThread();
        CompletableFuture<Long> appendEntriesFuture = this.appender.appendEntries(indexed.index());
        long l = committedPosition = indexed.isApplicationEntry() ? indexed.getApplicationEntry().highestPosition() : -1L;
        if (indexed.isApplicationEntry()) {
            appendEntriesFuture.whenCompleteAsync((commitIndex, commitError) -> {
                if (this.isRunning() && commitError == null) {
                    this.raft.notifyApplicationEntryCommittedPositionListeners(committedPosition);
                }
            }, (Executor)this.raft.getThreadContext());
        }
        appendEntriesFuture.whenCompleteAsync((commitIndex, commitError) -> {
            if (commitError == null) {
                if (this.isRunning()) {
                    appendListener.onCommit((long)commitIndex);
                }
            } else {
                long index = -1L;
                if (commitError instanceof RaftException.AppendFailureException) {
                    RaftException.AppendFailureException appendFailureException = (RaftException.AppendFailureException)commitError;
                    index = appendFailureException.getIndex();
                }
                appendListener.onCommitError(index, (Throwable)commitError);
                this.log.warn("Failed to replicate entry: {}", commitIndex, commitError);
            }
        }, (Executor)this.raft.getThreadContext());
    }

    public synchronized void onInitialEntriesCommitted(Runnable runnable) {
        this.commitInitialEntriesFuture.whenComplete((v, error) -> {
            if (error == null) {
                runnable.run();
            }
        });
    }
}

