package org.apache.kafka.raft;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.stream.Stream;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.server.common.KRaftVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClientPreVoteTest.class */
public class KafkaRaftClientPreVoteTest {
    @MethodSource({"kraftVersionHasFetchedCombinations"})
    @ParameterizedTest
    public void testHandlePreVoteRequestAsFollower(KRaftVersion kRaftVersion, boolean z) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        ReplicaKey replicaKey4 = KafkaRaftClientTest.replicaKey(randomReplicaId + 3, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})), kRaftVersion).withElectedLeader(2, replicaKey3.id()).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        if (z) {
            build.pollUntilRequest();
            RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
            build.assertFetchRequestData(assertSentFetchRequest, 2, 0L, 0);
            build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(2, replicaKey3.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
        }
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey3.id()), !z);
        build.assertElectedLeader(2, replicaKey3.id());
        build.deliverRequest(build.preVoteRequest(2, replicaKey4, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey3.id()), !z);
        build.assertElectedLeader(2, replicaKey3.id());
        build.deliverRequest(build.preVoteRequest(2 + 1, replicaKey2, 2 + 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2 + 1, OptionalInt.of(-1), true);
        Assertions.assertEquals(build.currentEpoch(), 2 + 1);
        Assertions.assertTrue(build.client.quorum().isUnattachedNotVoted());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandlePreVoteRequestAsFollowerWithVotedCandidate(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        VoterSet voterSet = VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3}));
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(voterSet, kRaftVersion).withVotedCandidate(2, replicaKey3).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.client.poll();
        build.assertSentFetchRequest();
        build.deliverRequest(build.beginEpochRequest(2, replicaKey3.id(), voterSet.listeners(replicaKey3.id())));
        build.pollUntilResponse();
        build.assertSentBeginQuorumEpochResponse(Errors.NONE);
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey3.id()), true);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(2, replicaKey3.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey3.id()), false);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandlePreVoteRequestAsCandidate(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withVotedCandidate(2, ReplicaKey.of(randomReplicaId, (Uuid) replicaKey.directoryId().get())).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        Assertions.assertTrue(build.client.quorum().isCandidate());
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.assertVotedCandidate(2, replicaKey);
        Assertions.assertTrue(build.client.quorum().isCandidate());
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 2L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.assertVotedCandidate(2, replicaKey);
        Assertions.assertTrue(build.client.quorum().isCandidate());
        build.deliverRequest(build.preVoteRequest(2 + 1, replicaKey2, 2 + 1, 2L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2 + 1, OptionalInt.of(-1), true);
        Assertions.assertTrue(build.client.quorum().isUnattached());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandlePreVoteRequestAsUnattachedObserver(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        ReplicaKey replicaKey4 = KafkaRaftClientTest.replicaKey(randomReplicaId + 3, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey2, replicaKey3})), kRaftVersion).withUnknownLeader(2).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertTrue(build.client.quorum().isObserver());
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey4, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        Assertions.assertEquals(2, build.currentEpoch());
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertTrue(build.client.quorum().isObserver());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandlePreVoteRequestAsUnattachedVoted(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        ReplicaKey replicaKey4 = KafkaRaftClientTest.replicaKey(randomReplicaId + 3, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey2, replicaKey3})), kRaftVersion).withVotedCandidate(2, replicaKey3).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        Assertions.assertTrue(build.client.quorum().isUnattachedAndVoted());
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey4, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        Assertions.assertEquals(2, build.currentEpoch());
        Assertions.assertTrue(build.client.quorum().isUnattachedAndVoted());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandlePreVoteRequestAsUnattachedWithLeader(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        ReplicaKey replicaKey4 = KafkaRaftClientTest.replicaKey(randomReplicaId + 3, true);
        ReplicaKey replicaKey5 = KafkaRaftClientTest.replicaKey(randomReplicaId + 4, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey2, replicaKey3})), kRaftVersion).withElectedLeader(2, replicaKey4.id()).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        Assertions.assertTrue(build.client.quorum().isUnattachedNotVoted());
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey4.id()), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey4.id()), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey4.id()), true);
        build.deliverRequest(build.preVoteRequest(2, replicaKey5, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey4.id()), true);
        Assertions.assertEquals(2, build.currentEpoch());
        Assertions.assertTrue(build.client.quorum().isUnattachedNotVoted());
    }

    @MethodSource({"kraftVersionHasFetchedCombinations"})
    @ParameterizedTest
    public void testHandlePreVoteRequestAsFollowerObserver(KRaftVersion kRaftVersion, boolean z) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey2, replicaKey3})), kRaftVersion).withElectedLeader(2, replicaKey2.id()).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.assertElectedLeader(2, replicaKey2.id());
        Assertions.assertTrue(build.client.quorum().isFollower());
        Assertions.assertTrue(build.client.quorum().isObserver());
        if (z) {
            build.pollUntilRequest();
            RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
            build.assertFetchRequestData(assertSentFetchRequest, 2, 0L, 0);
            build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(2, replicaKey2.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
        }
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey2.id()), !z);
        Assertions.assertTrue(build.client.quorum().isFollower());
        Assertions.assertTrue(build.client.quorum().isObserver());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandleInvalidPreVoteRequestWithOlderEpoch(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withUnknownLeader(2).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.deliverRequest(build.preVoteRequest(2 - 1, replicaKey2, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, 2, OptionalInt.empty(), false);
        build.assertUnknownLeaderAndNoVotedCandidate(2);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testLeaderRejectPreVoteRequestOnSameEpoch(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withUnknownLeader(2).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.unattachedToLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.preVoteRequest(currentEpoch, replicaKey2, currentEpoch, 1L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.assertElectedLeader(currentEpoch, randomReplicaId);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testPreVoteRequestClusterIdValidation(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.unattachedToLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.preVoteRequest(currentEpoch, replicaKey2, currentEpoch, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.voteRequest(null, currentEpoch, replicaKey2, currentEpoch, 0L, true));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.voteRequest("", currentEpoch, replicaKey2, currentEpoch, 0L, true));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
        build.deliverRequest(build.voteRequest("invalid-uuid", currentEpoch, replicaKey2, currentEpoch, 0L, true));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testInvalidVoterReplicaPreVoteRequest(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.unattachedToLeader();
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.voteRequest(build.clusterId.toString(), currentEpoch, replicaKey2, ReplicaKey.of(10, Uuid.randomUuid()), currentEpoch, 100L, true));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_VOTER_KEY, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.voteRequest(build.clusterId.toString(), currentEpoch, replicaKey2, ReplicaKey.of(0, Uuid.randomUuid()), currentEpoch, 100L, true));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_VOTER_KEY, currentEpoch, OptionalInt.of(randomReplicaId), false);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testLeaderAcceptPreVoteFromObserver(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true)})), kRaftVersion).withUnknownLeader(4).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.unattachedToLeader();
        int currentEpoch = build.currentEpoch();
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        build.deliverRequest(build.preVoteRequest(currentEpoch - 1, replicaKey2, 0, 0L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, currentEpoch, OptionalInt.of(randomReplicaId), false);
        build.deliverRequest(build.preVoteRequest(currentEpoch, replicaKey2, 0, 0L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), false);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testHandlePreVoteRequestAsResigned(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.unattachedToLeader();
        build.client.quorum().transitionToResigned(Collections.emptyList());
        Assertions.assertTrue(build.client.quorum().isResigned());
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.preVoteRequest(currentEpoch, replicaKey2, currentEpoch, build.log.endOffset().offset()));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch, OptionalInt.of(randomReplicaId), true);
        build.deliverRequest(build.preVoteRequest(currentEpoch + 1, replicaKey2, currentEpoch + 1, build.log.endOffset().offset()));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, currentEpoch + 1, OptionalInt.of(-1), true);
        Assertions.assertTrue(build.client.quorum().isUnattached());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testInvalidPreVoteRequest(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withElectedLeader(5, replicaKey2.id()).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        Assertions.assertEquals(5, build.currentEpoch());
        build.assertElectedLeader(5, replicaKey2.id());
        build.deliverRequest(build.preVoteRequest(5, replicaKey2, 0, -5L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(replicaKey2.id()), false);
        Assertions.assertEquals(5, build.currentEpoch());
        build.assertElectedLeader(5, replicaKey2.id());
        build.deliverRequest(build.preVoteRequest(5, replicaKey2, -1, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(replicaKey2.id()), false);
        Assertions.assertEquals(5, build.currentEpoch());
        build.assertElectedLeader(5, replicaKey2.id());
        build.deliverRequest(build.preVoteRequest(5, replicaKey2, 5 + 1, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(replicaKey2.id()), false);
        Assertions.assertEquals(5, build.currentEpoch());
        build.assertElectedLeader(5, replicaKey2.id());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testFollowerGrantsPreVoteIfHasNotFetchedYet(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey2, replicaKey3})), kRaftVersion).withElectedLeader(2, replicaKey2.id()).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.assertElectedLeader(2, replicaKey2.id());
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 1L));
        build.pollUntilResponse();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey2.id()), true);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        build.assertFetchRequestData(assertSentFetchRequest, 2, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId(), assertSentFetchRequest.destination(), build.fetchResponse(2, replicaKey2.id(), MemoryRecords.EMPTY, 0L, Errors.NONE));
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.deliverRequest(build.preVoteRequest(2, replicaKey3, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(replicaKey2.id()), false);
        Assertions.assertTrue(build.client.quorum().isFollower());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testRejectPreVoteIfRemoteLogIsNotUpToDate(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true)})), kRaftVersion).withUnknownLeader(2).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).appendToLog(2, Arrays.asList("a", "b", "c")).build();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertEquals(3L, build.log.endOffset().offset());
        build.deliverRequest(build.preVoteRequest(2 - 1, replicaKey2, 2 - 1, 0L));
        build.pollUntilResponse();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, 2, OptionalInt.empty(), false);
        build.deliverRequest(build.preVoteRequest(2, replicaKey2, 2 - 1, build.log.endOffset().offset() - 1));
        build.pollUntilResponse();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testPreVoteResponseIgnoredAfterBecomingFollower(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})), kRaftVersion).withUnknownLeader(5).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.assertUnknownLeaderAndNoVotedCandidate(5);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isProspective());
        List<RaftRequest.Outbound> collectPreVoteRequests = build.collectPreVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectPreVoteRequests.size());
        build.deliverRequest(build.beginEpochRequest(5, replicaKey3.id()));
        build.client.poll();
        build.assertElectedLeader(5, replicaKey3.id());
        build.deliverResponse(collectPreVoteRequests.get(0).correlationId(), collectPreVoteRequests.get(0).destination(), build.voteResponse(true, OptionalInt.empty(), 5));
        build.deliverResponse(collectPreVoteRequests.get(1).correlationId(), collectPreVoteRequests.get(1).destination(), build.voteResponse(true, OptionalInt.of(replicaKey3.id()), 5));
        build.client.poll();
        build.assertElectedLeader(5, replicaKey3.id());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testPreVoteNotSupportedByRemote(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true), KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true)})), kRaftVersion).withUnknownLeader(5).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.assertUnknownLeaderAndNoVotedCandidate(5);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertEquals(5, build.currentEpoch());
        Assertions.assertTrue(build.client.quorum().isProspective());
        List<RaftRequest.Outbound> collectPreVoteRequests = build.collectPreVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectPreVoteRequests.size());
        build.deliverResponse(collectPreVoteRequests.get(0).correlationId(), collectPreVoteRequests.get(0).destination(), RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION));
        build.client.poll();
        Assertions.assertEquals(5 + 1, build.currentEpoch());
        build.client.quorum().isCandidate();
        build.deliverResponse(collectPreVoteRequests.get(1).correlationId(), collectPreVoteRequests.get(1).destination(), build.voteResponse(true, OptionalInt.empty(), 5));
        build.client.poll();
        Assertions.assertEquals(5 + 1, build.currentEpoch());
        build.client.quorum().isCandidate();
        build.collectVoteRequests(5 + 1, 0, 0L);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        Assertions.assertEquals(5 + 1, build.currentEpoch());
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.pollUntilRequest();
        List<RaftRequest.Outbound> collectPreVoteRequests2 = build.collectPreVoteRequests(5 + 1, 0, 0L);
        Assertions.assertEquals(2, collectPreVoteRequests2.size());
        build.deliverResponse(collectPreVoteRequests2.get(0).correlationId(), collectPreVoteRequests2.get(0).destination(), build.voteResponse(true, OptionalInt.empty(), 5 + 1));
        build.client.poll();
        Assertions.assertEquals(5 + 2, build.currentEpoch());
        build.client.quorum().isCandidate();
        build.deliverResponse(collectPreVoteRequests2.get(1).correlationId(), collectPreVoteRequests2.get(1).destination(), RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION));
        build.client.poll();
        Assertions.assertEquals(5 + 2, build.currentEpoch());
        build.client.quorum().isCandidate();
    }

    @MethodSource({"kraftVersionRaftProtocolCombinations"})
    @ParameterizedTest
    public void testProspectiveReceivesBeginQuorumRequest(KRaftVersion kRaftVersion, RaftClientTestContext.RaftProtocol raftProtocol) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withUnknownLeader(5).withRaftProtocol(raftProtocol).build();
        build.assertUnknownLeaderAndNoVotedCandidate(5);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.deliverRequest(build.beginEpochRequest(5, replicaKey2.id()));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.assertElectedLeader(5, replicaKey2.id());
    }

    @MethodSource({"kraftVersionRaftProtocolCombinations"})
    @ParameterizedTest
    public void testProspectiveTransitionsToUnattachedOnElectionFailure(KRaftVersion kRaftVersion, RaftClientTestContext.RaftProtocol raftProtocol) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true)})), kRaftVersion).withUnknownLeader(5).withRaftProtocol(raftProtocol).build();
        build.assertUnknownLeaderAndNoVotedCandidate(5);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.assertSentPreVoteRequest(5, 0, 0L, 1);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentPreVoteRequest = build.assertSentPreVoteRequest(5, 0, 0L, 1);
        build.deliverResponse(assertSentPreVoteRequest.correlationId(), assertSentPreVoteRequest.destination(), build.voteResponse(false, OptionalInt.empty(), 5));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        build.assertSentPreVoteRequest(5, 0, 0L, 1);
    }

    @MethodSource({"kraftVersionRaftProtocolCombinations"})
    @ParameterizedTest
    public void testProspectiveWithLeaderTransitionsToFollower(KRaftVersion kRaftVersion, RaftClientTestContext.RaftProtocol raftProtocol) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        ReplicaKey replicaKey3 = KafkaRaftClientTest.replicaKey(randomReplicaId + 2, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, replicaKey3})), kRaftVersion).withElectedLeader(5, replicaKey2.id()).withRaftProtocol(raftProtocol).build();
        build.assertElectedLeader(5, replicaKey2.id());
        Assertions.assertTrue(build.client.quorum().isFollower());
        MockTime mockTime = build.time;
        Objects.requireNonNull(build);
        mockTime.sleep(50000L);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.assertSentPreVoteRequest(5, 0, 0L, 2);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        build.assertSentFetchRequest();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.assertElectedLeader(5, replicaKey2.id());
        MockTime mockTime2 = build.time;
        Objects.requireNonNull(build);
        mockTime2.sleep(50000L);
        build.pollUntilRequest();
        List<RaftRequest.Outbound> collectPreVoteRequests = build.collectPreVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectPreVoteRequests.size());
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.assertElectedLeader(5, replicaKey2.id());
        build.deliverResponse(collectPreVoteRequests.get(0).correlationId(), collectPreVoteRequests.get(0).destination(), build.voteResponse(false, OptionalInt.empty(), 5));
        build.client.poll();
        build.deliverResponse(collectPreVoteRequests.get(1).correlationId(), collectPreVoteRequests.get(1).destination(), build.voteResponse(false, OptionalInt.empty(), 5));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.client.poll();
        build.assertSentFetchRequest();
        MockTime mockTime3 = build.time;
        Objects.requireNonNull(build);
        mockTime3.sleep(50000L);
        build.pollUntilRequest();
        List<RaftRequest.Outbound> collectPreVoteRequests2 = build.collectPreVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectPreVoteRequests2.size());
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.assertElectedLeader(5, replicaKey2.id());
        build.deliverResponse(collectPreVoteRequests2.get(0).correlationId(), collectPreVoteRequests2.get(0).destination(), build.voteResponse(Errors.FENCED_LEADER_EPOCH, OptionalInt.of(replicaKey3.id()), 5 + 1));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isFollower());
        build.assertElectedLeader(5 + 1, replicaKey3.id());
    }

    @EnumSource(KRaftVersion.class)
    @ParameterizedTest
    public void testProspectiveLosesElectionHasLeaderButMissingEndpoint(KRaftVersion kRaftVersion) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true);
        int i = randomReplicaId + 3;
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2})), kRaftVersion).withElectedLeader(2, i).withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL).build();
        build.assertElectedLeader(2, i);
        Assertions.assertTrue(build.client.quorum().isUnattached());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isUnattached());
        Assertions.assertTrue(build.client.quorum().hasLeader());
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isProspective());
        Assertions.assertTrue(build.client.quorum().hasLeader());
    }

    @MethodSource({"kraftVersionRaftProtocolCombinations"})
    @ParameterizedTest
    public void testProspectiveWithoutLeaderTransitionsToFollower(KRaftVersion kRaftVersion, RaftClientTestContext.RaftProtocol raftProtocol) throws Exception {
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(KafkaRaftClientTest.randomReplicaId(), true);
        ReplicaKey replicaKey2 = KafkaRaftClientTest.replicaKey(replicaKey.id() + 1, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, replicaKey2, KafkaRaftClientTest.replicaKey(replicaKey.id() + 2, true)})), kRaftVersion).withUnknownLeader(5).withRaftProtocol(raftProtocol).build();
        build.assertUnknownLeaderAndNoVotedCandidate(5);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        Assertions.assertTrue(build.client.quorum().isProspective());
        List<RaftRequest.Outbound> collectPreVoteRequests = build.collectPreVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectPreVoteRequests.size());
        build.deliverResponse(collectPreVoteRequests.get(0).correlationId(), collectPreVoteRequests.get(0).destination(), build.voteResponse(true, OptionalInt.of(replicaKey2.id()), 5));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isFollower());
        Assertions.assertEquals(OptionalInt.of(replicaKey2.id()), build.client.quorum().leaderId());
    }

    @MethodSource({"kraftVersionRaftProtocolCombinations"})
    @ParameterizedTest
    public void testPreVoteRequestTimeout(KRaftVersion kRaftVersion, RaftClientTestContext.RaftProtocol raftProtocol) throws Exception {
        int randomReplicaId = KafkaRaftClientTest.randomReplicaId();
        ReplicaKey replicaKey = KafkaRaftClientTest.replicaKey(randomReplicaId, true);
        RaftClientTestContext build = new RaftClientTestContext.Builder(replicaKey.id(), (Uuid) replicaKey.directoryId().get()).withStartingVoters(VoterSetTest.voterSet((Stream<ReplicaKey>) Stream.of((Object[]) new ReplicaKey[]{replicaKey, KafkaRaftClientTest.replicaKey(randomReplicaId + 1, true)})), kRaftVersion).withUnknownLeader(1).withRaftProtocol(raftProtocol).build();
        build.assertUnknownLeaderAndNoVotedCandidate(1);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentPreVoteRequest = build.assertSentPreVoteRequest(1, 0, 0L, 1);
        build.time.sleep(build.requestTimeoutMs());
        build.client.poll();
        RaftRequest.Outbound assertSentPreVoteRequest2 = build.assertSentPreVoteRequest(1, 0, 0L, 1);
        build.deliverResponse(assertSentPreVoteRequest.correlationId(), assertSentPreVoteRequest.destination(), build.voteResponse(true, OptionalInt.empty(), 1));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isProspective());
        build.deliverResponse(assertSentPreVoteRequest2.correlationId(), assertSentPreVoteRequest2.destination(), build.voteResponse(true, OptionalInt.empty(), 1));
        build.client.poll();
        Assertions.assertTrue(build.client.quorum().isCandidate());
        build.assertVotedCandidate(1 + 1, replicaKey);
    }

    static Stream<Arguments> kraftVersionRaftProtocolCombinations() {
        return Stream.of((Object[]) KRaftVersion.values()).flatMap(kRaftVersion -> {
            return Stream.of((Object[]) RaftClientTestContext.RaftProtocol.values()).map(raftProtocol -> {
                return Arguments.of(new Object[]{kRaftVersion, raftProtocol});
            });
        });
    }

    static Stream<Arguments> kraftVersionHasFetchedCombinations() {
        return Stream.of((Object[]) KRaftVersion.values()).flatMap(kRaftVersion -> {
            return Stream.of((Object[]) new Boolean[]{true, false}).map(bool -> {
                return Arguments.of(new Object[]{kRaftVersion, bool});
            });
        });
    }
}
