package org.apache.kafka.raft;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClientTest.class */
public class KafkaRaftClientTest {
    @Test
    public void testInitializeSingleMemberQuorum() throws IOException {
        new RaftClientTestContext.Builder(0, (Set<Integer>) Collections.singleton(0)).build().assertElectedLeader(1, 0);
    }

    @Test
    public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Collections.singleton(0)).withElectedLeader(2, 0).build();
        Assertions.assertEquals(1L, build.log.endOffset().offset);
        Assertions.assertEquals(2 + 1, build.log.lastFetchedEpoch());
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 2 + 1), build.currentLeaderAndEpoch());
        build.assertElectedLeader(2 + 1, 0);
    }

    @Test
    public void testRejectVotesFromSameEpochAfterResigningLeadership() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(0).when(random)).nextInt(10000);
        }).withElectedLeader(2, 0).build();
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        build.assertElectedLeader(2, 0);
        build.deliverRequest(build.voteRequest(2, 1, build.log.lastFetchedEpoch(), build.log.endOffset().offset));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(0), false);
    }

    @Test
    public void testRejectVotesFromSameEpochAfterResigningCandidacy() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(0).when(random)).nextInt(10000);
        }).withVotedCandidate(2, 0).build();
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        build.assertVotedCandidate(2, 0);
        build.deliverRequest(build.voteRequest(2, 1, build.log.lastFetchedEpoch(), build.log.endOffset().offset));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
    }

    @Test
    public void testInitializeAsResignedLeaderFromStateStore() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(0).when(random)).nextInt(10000);
        }).withElectedLeader(2, 0).build();
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        build.assertElectedLeader(2, 0);
        build.client.poll();
        Assertions.assertEquals(Long.MAX_VALUE, build.client.scheduleAppend(2, Arrays.asList("a", "b")));
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentEndQuorumEpochRequest(2, 1), 1, build.endEpochResponse(2, OptionalInt.of(0)));
        build.client.poll();
        build.time.sleep(build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(2 + 1, 0);
        build.assertSentVoteRequest(2 + 1, 0, 0L, 1);
    }

    @Test
    public void testEndQuorumEpochRetriesWhileResigned() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1, 2})).withElectionTimeoutMs(10000).withRequestTimeoutMs(5000).withElectedLeader(19, 0).build();
        build.pollUntilRequest();
        List<RaftRequest.Outbound> collectEndQuorumRequests = build.collectEndQuorumRequests(19, Utils.mkSet(new Integer[]{1, 2}), Optional.empty());
        Assertions.assertEquals(2, collectEndQuorumRequests.size());
        RaftRequest.Outbound outbound = collectEndQuorumRequests.get(0);
        build.deliverResponse(outbound.correlationId, outbound.destinationId(), build.endEpochResponse(19, OptionalInt.of(0)));
        build.client.poll();
        Assertions.assertEquals(Collections.emptyList(), build.channel.drainSendQueue());
        int destinationId = collectEndQuorumRequests.get(1).destinationId();
        build.time.sleep(6000L);
        build.pollUntilRequest();
        Assertions.assertEquals(1, build.collectEndQuorumRequests(19, Utils.mkSet(new Integer[]{Integer.valueOf(destinationId)}), Optional.empty()).size());
    }

    @Test
    public void testResignWillCompleteFetchPurgatory() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(0), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        build.deliverRequest(build.fetchRequest(currentEpoch, 1, build.log.endOffset().offset, currentEpoch, 1000));
        build.client.poll();
        build.log.appendAsLeader(build.buildBatch(build.log.endOffset().offset, currentEpoch, Arrays.asList("raft")), currentEpoch);
        build.client.shutdown(1000);
        build.client.poll();
        build.assertSentFetchPartitionResponse(Errors.BROKER_NOT_AVAILABLE, currentEpoch, OptionalInt.of(0));
        build.assertResignedLeader(currentEpoch, 0);
        build.time.sleep(1000L);
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertFalse(build.client.isShuttingDown());
    }

    @Test
    public void testInitializeAsCandidateFromStateStore() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1, 2})).withVotedCandidate(2, 0).build();
        build.assertVotedCandidate(2, 0);
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        build.pollUntilRequest();
        Assertions.assertEquals(2, build.collectVoteRequests(2, 0, 0L).size());
    }

    @Test
    public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, 0);
        build.deliverResponse(build.assertSentVoteRequest(1, 0, 0L, 1), 1, build.voteResponse(true, Optional.empty(), 1));
        build.client.poll();
        build.assertElectedLeader(1, 0);
        long milliseconds = build.time.milliseconds();
        Assertions.assertEquals(1L, build.log.endOffset().offset);
        Assertions.assertEquals(1L, build.log.lastFlushedOffset());
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(1, 1);
        RecordBatch recordBatch = (RecordBatch) build.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
        Assertions.assertTrue(recordBatch.isControlBatch());
        Record record = (Record) recordBatch.iterator().next();
        Assertions.assertEquals(milliseconds, record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(0, Arrays.asList(1, 0), Arrays.asList(1, 0), record.key(), record.value());
    }

    @Test
    public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1, 2})).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, 0);
        build.deliverResponse(build.assertSentVoteRequest(1, 0, 0L, 2), 1, build.voteResponse(true, Optional.empty(), 1));
        build.client.poll();
        build.assertElectedLeader(1, 0);
        long milliseconds = build.time.milliseconds();
        Assertions.assertEquals(1L, build.log.endOffset().offset);
        Assertions.assertEquals(1L, build.log.lastFlushedOffset());
        build.client.poll();
        build.assertSentBeginQuorumEpochRequest(1, 2);
        RecordBatch recordBatch = (RecordBatch) build.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
        Assertions.assertTrue(recordBatch.isControlBatch());
        Record record = (Record) recordBatch.iterator().next();
        Assertions.assertEquals(milliseconds, record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(0, Arrays.asList(1, 2, 0), Arrays.asList(1, 0), record.key(), record.value());
    }

    @Test
    public void testHandleBeginQuorumRequest() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withVotedCandidate(2, 1).build();
        build.deliverRequest(build.beginEpochRequest(2, 1));
        build.pollUntilResponse();
        build.assertElectedLeader(2, 1);
        build.assertSentBeginQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
    }

    @Test
    public void testHandleBeginQuorumResponse() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(2, 0).build();
        build.deliverRequest(build.beginEpochRequest(2 + 1, 1));
        build.pollUntilResponse();
        build.assertElectedLeader(2 + 1, 1);
    }

    @Test
    public void testEndQuorumIgnoredAsCandidateIfOlderEpoch() throws Exception {
        int i = 85;
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(Integer.valueOf(i)).when(random)).nextInt(Mockito.anyInt());
        }).withUnknownLeader(5 - 1).build();
        build.time.sleep(build.electionTimeoutMs() + 85);
        build.client.poll();
        build.assertVotedCandidate(5, 0);
        build.deliverRequest(build.endEpochRequest(5 - 2, 1, Collections.singletonList(0)));
        build.client.poll();
        build.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, 5, OptionalInt.empty());
        build.time.sleep((build.electionTimeoutMs() + 85) - 1);
        build.client.poll();
        build.assertVotedCandidate(5, 0);
        build.time.sleep(1L);
        build.client.poll();
        build.assertVotedCandidate(5, 0);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(100L);
        build.client.poll();
        build.assertVotedCandidate(5 + 1, 0);
    }

    @Test
    public void testEndQuorumIgnoredAsLeaderIfOlderEpoch() throws Exception {
        int i = 0 + 1;
        int i2 = 0 + 2;
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, Integer.valueOf(i), Integer.valueOf(i2)}), 7);
        initializeAsLeader.deliverRequest(initializeAsLeader.endEpochRequest(7 - 2, i, Arrays.asList(0, Integer.valueOf(i2))));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, 7, OptionalInt.of(0));
        MockTime mockTime = initializeAsLeader.time;
        initializeAsLeader.getClass();
        mockTime.sleep(50000 - 1);
        initializeAsLeader.client.poll();
        initializeAsLeader.assertElectedLeader(7, 0);
    }

    @Test
    public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached() throws Exception {
        int i = 0 + 1;
        int i2 = 0 + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, Integer.valueOf(i), Integer.valueOf(i2)})).withUnknownLeader(2).build();
        build.deliverRequest(build.endEpochRequest(2, i, Arrays.asList(0, Integer.valueOf(i2))));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(i));
        build.client.poll();
        build.assertVotedCandidate(2 + 1, 0);
    }

    @Test
    public void testAccumulatorClearedAfterBecomingFollower() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{0, 1});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate);
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).withAppendLingerMs(50).withMemoryPool(memoryPool).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(0), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.scheduleAppend(currentEpoch, Collections.singletonList("a")));
        build.deliverRequest(build.beginEpochRequest(currentEpoch + 1, 1));
        build.pollUntilResponse();
        build.assertElectedLeader(currentEpoch + 1, 1);
        ((MemoryPool) Mockito.verify(memoryPool)).release(allocate);
    }

    @Test
    public void testAccumulatorClearedAfterBecomingVoted() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{0, 1});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate);
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).withAppendLingerMs(50).withMemoryPool(memoryPool).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(0), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.scheduleAppend(currentEpoch, Collections.singletonList("a")));
        build.deliverRequest(build.voteRequest(currentEpoch + 1, 1, currentEpoch, build.log.endOffset().offset));
        build.pollUntilResponse();
        build.assertVotedCandidate(currentEpoch + 1, 1);
        ((MemoryPool) Mockito.verify(memoryPool)).release(allocate);
    }

    @Test
    public void testAccumulatorClearedAfterBecomingUnattached() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{0, 1});
        MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
        ByteBuffer allocate = ByteBuffer.allocate(8388608);
        Mockito.when(memoryPool.tryAllocate(8388608)).thenReturn(allocate);
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).withAppendLingerMs(50).withMemoryPool(memoryPool).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(0), build.currentLeader());
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.scheduleAppend(currentEpoch, Collections.singletonList("a")));
        build.deliverRequest(build.voteRequest(currentEpoch + 1, 1, currentEpoch, 0L));
        build.pollUntilResponse();
        build.assertUnknownLeader(currentEpoch + 1);
        ((MemoryPool) Mockito.verify(memoryPool)).release(allocate);
    }

    @Test
    public void testChannelWokenUpIfLingerTimeoutReachedWithoutAppend() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withAppendLingerMs(50).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(0), build.currentLeader());
        Assertions.assertEquals(1L, build.log.endOffset().offset);
        Assertions.assertEquals(1L, build.client.scheduleAppend(build.currentEpoch(), Collections.singletonList("a")));
        Assertions.assertTrue(build.messageQueue.wakeupRequested());
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(50), build.messageQueue.lastPollTimeoutMs());
        build.time.sleep(20L);
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(30L), build.messageQueue.lastPollTimeoutMs());
        build.time.sleep(30L);
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset);
    }

    @Test
    public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withAppendLingerMs(50).build();
        build.becomeLeader();
        Assertions.assertEquals(OptionalInt.of(0), build.currentLeader());
        Assertions.assertEquals(1L, build.log.endOffset().offset);
        int currentEpoch = build.currentEpoch();
        Assertions.assertEquals(1L, build.client.scheduleAppend(currentEpoch, Collections.singletonList("a")));
        Assertions.assertTrue(build.messageQueue.wakeupRequested());
        build.client.poll();
        Assertions.assertFalse(build.messageQueue.wakeupRequested());
        Assertions.assertEquals(OptionalLong.of(50), build.messageQueue.lastPollTimeoutMs());
        build.time.sleep(50);
        Assertions.assertEquals(2L, build.client.scheduleAppend(currentEpoch, Collections.singletonList("b")));
        Assertions.assertTrue(build.messageQueue.wakeupRequested());
        build.client.poll();
        Assertions.assertEquals(3L, build.log.endOffset().offset);
    }

    @Test
    public void testHandleEndQuorumRequest() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(2, 1).build();
        build.deliverRequest(build.endEpochRequest(2, 1, Collections.singletonList(0)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
        build.client.poll();
        build.assertVotedCandidate(2 + 1, 0);
    }

    @Test
    public void testHandleEndQuorumRequestWithLowerPriorityToBecomeLeader() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1, 3})).withElectedLeader(2, 1).build();
        build.deliverRequest(build.endEpochRequest(2, 1, Arrays.asList(3, 0)));
        build.pollUntilResponse();
        build.assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
        build.time.sleep(1L);
        build.pollUntilRequest();
        build.assertSentFetchRequest(2, 0L, 0);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50L);
        build.pollUntilRequest();
        Assertions.assertEquals(2, build.collectVoteRequests(2 + 1, 0, 0L).size());
        build.assertVotedCandidate(2 + 1, 0);
    }

    @Test
    public void testVoteRequestTimeout() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, 0);
        int assertSentVoteRequest = build.assertSentVoteRequest(1, 0, 0L, 1);
        build.time.sleep(build.requestTimeoutMs());
        build.client.poll();
        int assertSentVoteRequest2 = build.assertSentVoteRequest(1, 0, 0L, 1);
        build.deliverResponse(assertSentVoteRequest, 1, build.voteResponse(true, Optional.empty(), 1));
        build.client.poll();
        build.assertVotedCandidate(1, 0);
        build.deliverResponse(assertSentVoteRequest2, 1, build.voteResponse(true, Optional.empty(), 1));
        build.client.poll();
        build.assertElectedLeader(1, 0);
    }

    @Test
    public void testHandleValidVoteRequestAsFollower() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withUnknownLeader(2).build();
        build.deliverRequest(build.voteRequest(2, 1, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        build.assertVotedCandidate(2, 1);
    }

    @Test
    public void testHandleVoteRequestAsFollowerWithElectedLeader() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1, 3})).withElectedLeader(2, 3).build();
        build.deliverRequest(build.voteRequest(2, 1, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(3), false);
        build.assertElectedLeader(2, 3);
    }

    @Test
    public void testHandleVoteRequestAsFollowerWithVotedCandidate() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1, 3})).withVotedCandidate(2, 3).build();
        build.deliverRequest(build.voteRequest(2, 1, 2 - 1, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
        build.assertVotedCandidate(2, 3);
    }

    @Test
    public void testHandleInvalidVoteRequestWithOlderEpoch() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withUnknownLeader(2).build();
        build.deliverRequest(build.voteRequest(2 - 1, 1, 2 - 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, 2, OptionalInt.empty(), false);
        build.assertUnknownLeader(2);
    }

    @Test
    public void testHandleInvalidVoteRequestAsObserver() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{1, 2})).withUnknownLeader(2).build();
        build.deliverRequest(build.voteRequest(2 + 1, 1, 2, 1L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, 2, OptionalInt.empty(), false);
        build.assertUnknownLeader(2);
    }

    @Test
    public void testLeaderIgnoreVoteRequestOnSameEpoch() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 2);
        initializeAsLeader.deliverRequest(initializeAsLeader.voteRequest(2, 1, 2 - 1, 1L));
        initializeAsLeader.client.poll();
        initializeAsLeader.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(0), false);
        initializeAsLeader.assertElectedLeader(2, 0);
    }

    @Test
    public void testListenerCommitCallbackAfterLeaderWrite() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.client.poll();
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.client.highWatermark());
        Assertions.assertEquals(1L, initializeAsLeader.log.endOffset().offset);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 1L, 5, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(0));
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.client.highWatermark());
        List asList = Arrays.asList("a", "b", "c");
        long longValue = initializeAsLeader.client.scheduleAppend(5, asList).longValue();
        initializeAsLeader.client.poll();
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.listener.lastCommitOffset());
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 1L, 5, 500));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(0));
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.client.highWatermark());
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.listener.lastCommitOffset());
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 4L, 5, 500));
        initializeAsLeader.pollUntil(() -> {
            return initializeAsLeader.client.highWatermark().equals(OptionalLong.of(4L));
        });
        Assertions.assertEquals(asList, initializeAsLeader.listener.commitWithLastOffset(longValue));
    }

    @Test
    public void testCandidateIgnoreVoteRequestOnSameEpoch() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withVotedCandidate(2, 0).build();
        build.pollUntilRequest();
        build.deliverRequest(build.voteRequest(2, 1, 2 - 1, 1L));
        build.client.poll();
        build.assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
        build.assertVotedCandidate(2, 0);
    }

    @Test
    public void testRetryElection() throws Exception {
        int i = 85;
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(Integer.valueOf(i)).when(random)).nextInt(Mockito.anyInt());
        }).build();
        build.assertUnknownLeader(0);
        build.time.sleep(2 * build.electionTimeoutMs());
        build.pollUntilRequest();
        build.assertVotedCandidate(1, 0);
        build.deliverResponse(build.assertSentVoteRequest(1, 0, 0L, 1), 1, build.voteResponse(false, Optional.empty(), 1));
        build.client.poll();
        build.assertVotedCandidate(1, 0);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(100 - 1);
        build.client.poll();
        build.assertVotedCandidate(1, 0);
        build.time.sleep(1L);
        build.client.poll();
        build.pollUntilRequest();
        build.assertVotedCandidate(1 + 1, 0);
        build.assertSentVoteRequest(1 + 1, 0, 0L, 1);
    }

    @Test
    public void testInitializeAsFollowerEmptyLog() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 0L, 0);
    }

    @Test
    public void testInitializeAsFollowerNonEmptyLog() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).appendToLog(3, Collections.singletonList("foo")).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 1L, 3);
    }

    @Test
    public void testVoterBecomeCandidateAfterFetchTimeout() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).appendToLog(3, Collections.singletonList("foo")).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        build.assertSentFetchRequest(5, 1L, 3);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.pollUntilRequest();
        build.assertSentVoteRequest(5 + 1, 3, 1L, 1);
        build.assertVotedCandidate(5 + 1, 0);
    }

    @Test
    public void testInitializeObserverNoPreviousState() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{1, 2});
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, 1);
    }

    @Test
    public void testObserverQuorumDiscoveryFailure() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{1});
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), build.fetchResponse(-1, -1, MemoryRecords.EMPTY, -1L, Errors.UNKNOWN_SERVER_ERROR));
        build.client.poll();
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50L);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest2, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, 1);
    }

    @Test
    public void testObserverSendDiscoveryFetchAfterFetchTimeout() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{1, 2});
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, 1);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
    }

    @Test
    public void testInvalidFetchRequest() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, -5L, 0, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 0L, -1, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 0L, 5 + 1, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5 + 1, 1, 0L, 0, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.UNKNOWN_LEADER_EPOCH, 5, OptionalInt.of(0));
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 0L, 0, -1));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
    }

    @Test
    public void testFetchRequestClusterIdValidation() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, null, 1, -5L, 0, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, "", 1, -5L, 0, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, "invalid-uuid", 1, -5L, 0, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
    }

    @Test
    public void testVoterOnlyRequestValidation() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.deliverRequest(initializeAsLeader.voteRequest(5, 2, 0, 0L));
        initializeAsLeader.client.poll();
        initializeAsLeader.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, 5, OptionalInt.of(0), false);
        initializeAsLeader.deliverRequest(initializeAsLeader.beginEpochRequest(5, 2));
        initializeAsLeader.client.poll();
        initializeAsLeader.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, 5, OptionalInt.of(0));
        initializeAsLeader.deliverRequest(initializeAsLeader.endEpochRequest(5, 2, Collections.singletonList(1)));
        initializeAsLeader.client.poll();
        initializeAsLeader.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, 5, OptionalInt.of(0));
    }

    @Test
    public void testInvalidVoteRequest() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.deliverRequest(build.voteRequest(5 + 1, 1, 0, -5L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(1), false);
        build.assertElectedLeader(5, 1);
        build.deliverRequest(build.voteRequest(5 + 1, 1, -1, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(1), false);
        build.assertElectedLeader(5, 1);
        build.deliverRequest(build.voteRequest(5 + 1, 1, 5 + 1, 0L));
        build.pollUntilResponse();
        build.assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(1), false);
        build.assertElectedLeader(5, 1);
    }

    @Test
    public void testPurgatoryFetchTimeout() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 1L, 5, 500));
        initializeAsLeader.client.poll();
        Assertions.assertEquals(0, initializeAsLeader.channel.drainSendQueue().size());
        initializeAsLeader.time.sleep(500);
        initializeAsLeader.client.poll();
        Assertions.assertEquals(0, initializeAsLeader.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(0)).sizeInBytes());
    }

    @Test
    public void testPurgatoryFetchSatisfiedByWrite() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, 1, 1L, 5, 500));
        initializeAsLeader.client.poll();
        Assertions.assertEquals(0, initializeAsLeader.channel.drainSendQueue().size());
        String[] strArr = {"a", "b", "c"};
        initializeAsLeader.client.scheduleAppend(5, Arrays.asList(strArr));
        initializeAsLeader.client.poll();
        RaftClientTestContext.assertMatchingRecords(strArr, initializeAsLeader.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(0)));
    }

    @Test
    public void testPurgatoryFetchCompletedByFollowerTransition() throws Exception {
        int i = 0 + 1;
        int i2 = 0 + 2;
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, Integer.valueOf(i), Integer.valueOf(i2)}), 5);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(5, i, 1L, 5, 500));
        initializeAsLeader.client.poll();
        Assertions.assertTrue(initializeAsLeader.channel.drainSendQueue().stream().noneMatch(outbound -> {
            return outbound.data() instanceof FetchResponseData;
        }));
        initializeAsLeader.deliverRequest(initializeAsLeader.beginEpochRequest(5 + 1, i2));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertElectedLeader(5 + 1, i2);
        initializeAsLeader.assertSentBeginQuorumEpochResponse(Errors.NONE, 5 + 1, OptionalInt.of(i2));
        Assertions.assertEquals(0, initializeAsLeader.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, 5 + 1, OptionalInt.of(i2)).sizeInBytes());
    }

    @Test
    public void testFetchResponseIgnoredAfterBecomingCandidate() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        int assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        MockTime mockTime = build.time;
        build.getClass();
        mockTime.sleep(50000L);
        build.client.poll();
        build.assertVotedCandidate(5 + 1, 0);
        build.deliverResponse(assertSentFetchRequest, 1, build.fetchResponse(5, 1, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        build.assertVotedCandidate(5 + 1, 0);
    }

    @Test
    public void testFetchResponseIgnoredAfterBecomingFollowerOfDifferentLeader() throws Exception {
        int i = 0 + 1;
        int i2 = 0 + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, Integer.valueOf(i), Integer.valueOf(i2)})).withElectedLeader(5, i).build();
        build.assertElectedLeader(5, i);
        build.pollUntilRequest();
        int assertSentFetchRequest = build.assertSentFetchRequest(5, 0L, 0);
        build.deliverRequest(build.beginEpochRequest(5 + 1, i2));
        build.client.poll();
        build.assertElectedLeader(5 + 1, i2);
        build.deliverResponse(assertSentFetchRequest, i, build.fetchResponse(5, i, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        build.assertElectedLeader(5 + 1, i2);
    }

    @Test
    public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception {
        int i = 0 + 1;
        int i2 = 0 + 2;
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, Integer.valueOf(i), Integer.valueOf(i2)})).withUnknownLeader(5 - 1).build();
        build.assertUnknownLeader(5 - 1);
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        build.assertVotedCandidate(5, 0);
        List<RaftRequest.Outbound> collectVoteRequests = build.collectVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectVoteRequests.size());
        build.deliverRequest(build.beginEpochRequest(5, i2));
        build.client.poll();
        build.assertElectedLeader(5, i2);
        build.deliverResponse(collectVoteRequests.get(0).correlationId, i, build.voteResponse(false, Optional.empty(), 5));
        build.deliverResponse(collectVoteRequests.get(1).correlationId, i2, build.voteResponse(false, Optional.of(Integer.valueOf(i2)), 5));
        build.client.poll();
        build.assertElectedLeader(5, i2);
    }

    @Test
    public void testObserverLeaderRediscoveryAfterBrokerNotAvailableError() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{1, 2});
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).build();
        build.discoverLeaderAsObserver(1, 5);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertEquals(1, assertSentFetchRequest.destinationId());
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), build.fetchResponse(5, -1, MemoryRecords.EMPTY, -1L, Errors.BROKER_NOT_AVAILABLE));
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(1, assertSentFetchRequest2.destinationId());
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, assertSentFetchRequest2.destinationId() == 1 ? Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER));
        build.client.poll();
        build.assertElectedLeader(5, 1);
    }

    @Test
    public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{1, 2});
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).build();
        build.discoverLeaderAsObserver(1, 5);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertEquals(1, assertSentFetchRequest.destinationId());
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        build.time.sleep(build.requestTimeoutMs());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertNotEquals(1, assertSentFetchRequest2.destinationId());
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, 1);
    }

    @Test
    public void testLeaderGracefulShutdown() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 1);
        CompletableFuture shutdown = initializeAsLeader.client.shutdown(5000);
        Assertions.assertTrue(initializeAsLeader.client.isShuttingDown());
        Assertions.assertTrue(initializeAsLeader.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        initializeAsLeader.pollUntilRequest();
        Assertions.assertTrue(initializeAsLeader.client.isShuttingDown());
        Assertions.assertTrue(initializeAsLeader.client.isRunning());
        initializeAsLeader.assertSentEndQuorumEpochRequest(1, 1);
        initializeAsLeader.deliverRequest(initializeAsLeader.voteRequest(1 + 1, 1, 1, 1L));
        initializeAsLeader.client.poll();
        initializeAsLeader.assertSentVoteResponse(Errors.NONE, 1 + 1, OptionalInt.empty(), true);
        initializeAsLeader.deliverRequest(initializeAsLeader.beginEpochRequest(2, 1));
        TestUtils.waitForCondition(() -> {
            initializeAsLeader.client.poll();
            return !initializeAsLeader.client.isRunning();
        }, 5000L, "Client failed to shutdown before expiration of timeout");
        Assertions.assertFalse(initializeAsLeader.client.isShuttingDown());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @Test
    public void testEndQuorumEpochSentBasedOnFetchOffset() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 2, 1}), 1);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(1, 1, 1L, 1, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(1L, 1);
        initializeAsLeader.client.scheduleAppend(1, Arrays.asList("foo", "bar"));
        initializeAsLeader.client.poll();
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(1, 2, 3L, 1, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(3L, 1);
        initializeAsLeader.client.shutdown(initializeAsLeader.electionTimeoutMs() * 2);
        Assertions.assertTrue(initializeAsLeader.client.isRunning());
        initializeAsLeader.pollUntilRequest();
        Assertions.assertTrue(initializeAsLeader.client.isRunning());
        initializeAsLeader.collectEndQuorumRequests(1, Utils.mkSet(new Integer[]{2, 1}), Optional.of(Arrays.asList(2, 1)));
    }

    @Test
    public void testDescribeQuorum() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 2, 1}), 1);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(1, 1, 1L, 1, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(1L, 1);
        initializeAsLeader.client.scheduleAppend(1, Arrays.asList("foo", "bar"));
        initializeAsLeader.client.poll();
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(1, 2, 3L, 1, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(3L, 1);
        initializeAsLeader.deliverRequest(initializeAsLeader.fetchRequest(1, 3, 0L, 0, 0));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentFetchPartitionResponse(3L, 1);
        initializeAsLeader.deliverRequest(DescribeQuorumRequest.singletonRequest(initializeAsLeader.metadataPartition));
        initializeAsLeader.pollUntilResponse();
        initializeAsLeader.assertSentDescribeQuorumResponse(0, 1, 3L, Arrays.asList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(0).setLogEndOffset(3L), new DescribeQuorumResponseData.ReplicaState().setReplicaId(1).setLogEndOffset(1L), new DescribeQuorumResponseData.ReplicaState().setReplicaId(2).setLogEndOffset(3L)), Collections.singletonList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(3).setLogEndOffset(0L)));
    }

    @Test
    public void testLeaderGracefulShutdownTimeout() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 1);
        CompletableFuture shutdown = initializeAsLeader.client.shutdown(5000);
        Assertions.assertTrue(initializeAsLeader.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        initializeAsLeader.pollUntilRequest();
        Assertions.assertTrue(initializeAsLeader.client.isRunning());
        initializeAsLeader.assertSentEndQuorumEpochRequest(1, 1);
        initializeAsLeader.time.sleep(5000);
        initializeAsLeader.client.poll();
        Assertions.assertFalse(initializeAsLeader.client.isRunning());
        Assertions.assertTrue(shutdown.isCompletedExceptionally());
        TestUtils.assertFutureThrows(shutdown, TimeoutException.class);
    }

    @Test
    public void testFollowerGracefulShutdown() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.client.poll();
        CompletableFuture shutdown = build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @Test
    public void testObserverGracefulShutdown() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{1, 2})).withUnknownLeader(5).build();
        build.client.poll();
        build.assertUnknownLeader(5);
        CompletableFuture shutdown = build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @Test
    public void testGracefulShutdownSingleMemberQuorum() throws IOException {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Collections.singleton(0)).build();
        build.assertElectedLeader(1, 0);
        build.client.poll();
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
        build.client.shutdown(5000);
        Assertions.assertTrue(build.client.isRunning());
        build.client.poll();
        Assertions.assertFalse(build.client.isRunning());
    }

    @Test
    public void testFollowerReplication() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentFetchRequest(5, 0L, 0), 1, build.fetchResponse(5, 1, build.buildBatch(0L, 3, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset);
        Assertions.assertEquals(2L, build.log.lastFlushedOffset());
    }

    @Test
    public void testEmptyRecordSetInFetchResponse() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentFetchRequest(5, 0L, 0), 1, build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(0L, build.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(0L), build.client.highWatermark());
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentFetchRequest(5, 0L, 0), 1, build.fetchResponse(5, 1, build.buildBatch(0L, 5, Arrays.asList("a", "b")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(0L), build.client.highWatermark());
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentFetchRequest(5, 2L, 5), 1, build.fetchResponse(5, 1, MemoryRecords.EMPTY, 2L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(2L), build.client.highWatermark());
    }

    @Test
    public void testFetchShouldBeTreatedAsLeaderAcknowledgement() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(0).when(random)).nextInt(10000);
        }).withUnknownLeader(5 - 1).build();
        build.time.sleep(build.electionTimeoutMs());
        build.expectAndGrantVotes(5);
        build.pollUntilRequest();
        build.assertSentBeginQuorumEpochRequest(5, 1);
        build.deliverRequest(build.fetchRequest(5, 1, 0L, 0, 500));
        build.client.poll();
        build.assertSentFetchPartitionResponse(Errors.NONE, 5, OptionalInt.of(0));
        build.time.sleep(build.requestTimeoutMs());
        build.client.poll();
        Assertions.assertEquals(0, build.channel.drainSendQueue().size());
    }

    @Test
    public void testLeaderAppendSingleMemberQuorum() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Collections.singleton(0)).build();
        long milliseconds = build.time.milliseconds();
        build.assertElectedLeader(1, 0);
        Assertions.assertEquals(OptionalLong.of(1L), build.client.highWatermark());
        String[] strArr = {"a", "b", "c"};
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(1L), build.client.highWatermark());
        build.client.scheduleAppend(build.currentEpoch(), Arrays.asList(strArr));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(4L), build.client.highWatermark());
        build.deliverRequest(build.fetchRequest(1, 1, 0L, 0, 500));
        build.pollUntilResponse();
        List list = Utils.toList(build.assertSentFetchPartitionResponse(Errors.NONE, 1, OptionalInt.of(0)).batchIterator());
        Assertions.assertEquals(2, list.size());
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
        Assertions.assertTrue(mutableRecordBatch.isControlBatch());
        List list2 = Utils.toList(mutableRecordBatch.iterator());
        Assertions.assertEquals(1, list2.size());
        Record record = (Record) list2.get(0);
        Assertions.assertEquals(milliseconds, record.timestamp());
        RaftClientTestContext.verifyLeaderChangeMessage(0, Collections.singletonList(0), Collections.singletonList(0), record.key(), record.value());
        MutableRecordBatch mutableRecordBatch2 = (MutableRecordBatch) list.get(1);
        Assertions.assertEquals(1, mutableRecordBatch2.partitionLeaderEpoch());
        List list3 = Utils.toList(mutableRecordBatch2.iterator());
        Assertions.assertEquals(3, list3.size());
        for (int i = 0; i < strArr.length; i++) {
            Assertions.assertEquals(strArr[i], Utils.utf8(((Record) list3.get(i)).value()));
        }
    }

    @Test
    public void testFollowerLogReconciliation() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).appendToLog(3, Arrays.asList("foo", "bar")).appendToLog(3, Arrays.asList("baz")).build();
        build.assertElectedLeader(5, 1);
        Assertions.assertEquals(3L, build.log.endOffset().offset);
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentFetchRequest(5, 3L, 3), 1, build.divergingFetchResponse(5, 1, 2L, 3, 1L));
        build.client.poll();
        Assertions.assertEquals(2L, build.log.endOffset().offset);
        build.client.poll();
        build.assertSentFetchRequest(5, 2L, 3);
    }

    @Test
    public void testMetrics() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Collections.singleton(0)).build();
        Assertions.assertNotNull(getMetric(build.metrics, "current-state"));
        Assertions.assertNotNull(getMetric(build.metrics, "current-leader"));
        Assertions.assertNotNull(getMetric(build.metrics, "current-vote"));
        Assertions.assertNotNull(getMetric(build.metrics, "current-epoch"));
        Assertions.assertNotNull(getMetric(build.metrics, "high-watermark"));
        Assertions.assertNotNull(getMetric(build.metrics, "log-end-offset"));
        Assertions.assertNotNull(getMetric(build.metrics, "log-end-epoch"));
        Assertions.assertNotNull(getMetric(build.metrics, "number-unknown-voter-connections"));
        Assertions.assertNotNull(getMetric(build.metrics, "poll-idle-ratio-avg"));
        Assertions.assertNotNull(getMetric(build.metrics, "commit-latency-avg"));
        Assertions.assertNotNull(getMetric(build.metrics, "commit-latency-max"));
        Assertions.assertNotNull(getMetric(build.metrics, "election-latency-avg"));
        Assertions.assertNotNull(getMetric(build.metrics, "election-latency-max"));
        Assertions.assertNotNull(getMetric(build.metrics, "fetch-records-rate"));
        Assertions.assertNotNull(getMetric(build.metrics, "append-records-rate"));
        Assertions.assertEquals("leader", getMetric(build.metrics, "current-state").metricValue());
        Assertions.assertEquals(Double.valueOf(0), getMetric(build.metrics, "current-leader").metricValue());
        Assertions.assertEquals(Double.valueOf(0), getMetric(build.metrics, "current-vote").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(build.metrics, "current-epoch").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric(build.metrics, "high-watermark").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric(build.metrics, "log-end-offset").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(build.metrics, "log-end-epoch").metricValue());
        build.client.scheduleAppend(1, Arrays.asList("a", "b", "c"));
        build.client.poll();
        Assertions.assertEquals(Double.valueOf(4.0d), getMetric(build.metrics, "high-watermark").metricValue());
        Assertions.assertEquals(Double.valueOf(4.0d), getMetric(build.metrics, "log-end-offset").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(build.metrics, "log-end-epoch").metricValue());
        build.client.close();
        Assertions.assertEquals(1, build.metrics.metrics().size());
    }

    @Test
    public void testClusterAuthorizationFailedInFetch() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withElectedLeader(5, 1).build();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentFetchRequest(5, 0L, 0), 1, new FetchResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @Test
    public void testClusterAuthorizationFailedInBeginQuorumEpoch() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).updateRandom(random -> {
            ((Random) Mockito.doReturn(0).when(random)).nextInt(10000);
        }).withUnknownLeader(5 - 1).build();
        build.time.sleep(build.electionTimeoutMs());
        build.expectAndGrantVotes(5);
        build.pollUntilRequest();
        build.deliverResponse(build.assertSentBeginQuorumEpochRequest(5, 1), 1, new BeginQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @Test
    public void testClusterAuthorizationFailedInVote() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).withUnknownLeader(5 - 1).build();
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.pollUntilRequest();
        build.assertVotedCandidate(5, 0);
        build.deliverResponse(build.assertSentVoteRequest(5, 0, 0L, 1), 1, new VoteResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = build.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @Test
    public void testClusterAuthorizationFailedInEndQuorumEpoch() throws Exception {
        RaftClientTestContext initializeAsLeader = RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 2);
        initializeAsLeader.client.shutdown(5000);
        initializeAsLeader.pollUntilRequest();
        initializeAsLeader.deliverResponse(initializeAsLeader.assertSentEndQuorumEpochRequest(2, 1), 1, new EndQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        KafkaRaftClient<String> kafkaRaftClient = initializeAsLeader.client;
        kafkaRaftClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaRaftClient::poll);
    }

    @Test
    public void testHandleClaimFiresImmediatelyOnEmptyLog() throws Exception {
        Assertions.assertEquals(OptionalInt.of(5), RaftClientTestContext.initializeAsLeader(0, Utils.mkSet(new Integer[]{0, 1}), 5).listener.currentClaimedEpoch());
    }

    @Test
    public void testHandleClaimCallbackFiresAfterHighWatermarkReachesEpochStartOffset() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{0, 1});
        List<String> asList = Arrays.asList("1", "2", "3");
        List<String> asList2 = Arrays.asList("4", "5", "6");
        List<String> asList3 = Arrays.asList("7", "8", "9");
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).appendToLog(1, asList).appendToLog(1, asList2).appendToLog(2, asList3).withUnknownLeader(5 - 1).build();
        build.becomeLeader();
        build.client.poll();
        Assertions.assertEquals(10L, build.log.endOffset().offset);
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.empty(), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(5, 1, 3L, 1, 500));
        build.client.poll();
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(OptionalLong.empty(), build.listener.lastCommitOffset());
        build.deliverRequest(build.fetchRequest(5, 1, 10L, 5, 500));
        build.client.poll();
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(3, build.listener.numCommittedBatches());
        Assertions.assertEquals(asList, build.listener.commitWithBaseOffset(0L));
        Assertions.assertEquals(asList2, build.listener.commitWithBaseOffset(3L));
        Assertions.assertEquals(asList3, build.listener.commitWithBaseOffset(6L));
        Assertions.assertEquals(OptionalLong.of(8L), build.listener.lastCommitOffset());
        build.client.poll();
        Assertions.assertEquals(OptionalInt.of(5), build.listener.currentClaimedEpoch());
    }

    @Test
    public void testLateRegisteredListenerCatchesUp() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{0, 1});
        List<String> asList = Arrays.asList("1", "2", "3");
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).appendToLog(1, asList).appendToLog(1, Arrays.asList("4", "5", "6")).appendToLog(2, Arrays.asList("7", "8", "9")).withUnknownLeader(5 - 1).build();
        build.becomeLeader();
        build.client.poll();
        Assertions.assertEquals(10L, build.log.endOffset().offset);
        build.deliverRequest(build.fetchRequest(5, 1, 10L, 5, 0));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        build.client.poll();
        Assertions.assertEquals(OptionalInt.of(5), build.listener.currentClaimedEpoch());
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener();
        build.client.register(mockListener);
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(8L), mockListener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.of(5), build.listener.currentClaimedEpoch());
        Assertions.assertEquals(9L, build.listener.claimedEpochStartOffset(5));
    }

    @Test
    public void testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{0, 1});
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) mkSet).withElectedLeader(5, 1).build();
        Assertions.assertEquals(OptionalLong.empty(), build.client.highWatermark());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        List<String> asList = Arrays.asList("a", "b", "c");
        build.deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), build.fetchResponse(5, 1, build.buildBatch(0L, 3, asList), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(0L), build.client.highWatermark());
        Assertions.assertEquals(0, build.listener.numCommittedBatches());
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 3L, 3);
        build.deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), build.fetchResponse(5, 1, build.buildBatch(3L, 3, Arrays.asList("d", "e", "f")), 3L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(3L), build.client.highWatermark());
        Assertions.assertEquals(1, build.listener.numCommittedBatches());
        Assertions.assertEquals(OptionalLong.of(2L), build.listener.lastCommitOffset());
        Assertions.assertEquals(asList, build.listener.lastCommit().records());
        Assertions.assertEquals(OptionalInt.empty(), build.listener.currentClaimedEpoch());
    }

    @Test
    public void testHandleCommitCallbackFiresInVotedState() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).appendToLog(2, Arrays.asList("a", "b", "c")).appendToLog(4, Arrays.asList("d", "e", "f")).appendToLog(4, Arrays.asList("g", "h", "i")).withUnknownLeader(7 - 1).build();
        build.becomeLeader();
        build.deliverRequest(build.fetchRequest(7, 1, 10L, 7, 500));
        build.client.poll();
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        int i = 7 + 1;
        build.deliverRequest(build.voteRequest(i, 1, 7, 10L));
        build.pollUntilResponse();
        build.assertVotedCandidate(i, 1);
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener();
        build.client.register(mockListener);
        build.client.poll();
        build.assertVotedCandidate(i, 1);
        Assertions.assertEquals(OptionalLong.of(8L), mockListener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.empty(), mockListener.currentClaimedEpoch());
    }

    @Test
    public void testHandleCommitCallbackFiresInCandidateState() throws Exception {
        RaftClientTestContext build = new RaftClientTestContext.Builder(0, (Set<Integer>) Utils.mkSet(new Integer[]{0, 1})).appendToLog(2, Arrays.asList("a", "b", "c")).appendToLog(4, Arrays.asList("d", "e", "f")).appendToLog(4, Arrays.asList("g", "h", "i")).withUnknownLeader(7 - 1).build();
        build.becomeLeader();
        Assertions.assertEquals(10L, build.log.endOffset().offset);
        build.deliverRequest(build.fetchRequest(7, 1, 10L, 7, 0));
        build.pollUntilResponse();
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        build.assertSentFetchPartitionResponse(Errors.NONE, 7, OptionalInt.of(0));
        build.deliverRequest(build.voteRequest(7 + 1, 1, 7, 9L));
        build.pollUntilResponse();
        build.assertUnknownLeader(7 + 1);
        Assertions.assertEquals(OptionalLong.of(10L), build.client.highWatermark());
        int i = 7 + 2;
        build.time.sleep(build.electionTimeoutMs() * 2);
        build.client.poll();
        build.assertVotedCandidate(i, 0);
        RaftClientTestContext.MockListener mockListener = new RaftClientTestContext.MockListener();
        build.client.register(mockListener);
        build.client.poll();
        build.assertVotedCandidate(i, 0);
        Assertions.assertEquals(OptionalLong.of(8L), mockListener.lastCommitOffset());
        Assertions.assertEquals(OptionalInt.empty(), mockListener.currentClaimedEpoch());
    }

    @Test
    public void testObserverFetchWithNoLocalId() throws Exception {
        Set mkSet = Utils.mkSet(new Integer[]{1, 2});
        RaftClientTestContext build = new RaftClientTestContext.Builder(OptionalInt.empty(), (Set<Integer>) mkSet).build();
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = build.assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        build.assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        build.deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), build.fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        build.client.poll();
        build.assertElectedLeader(5, 1);
        build.pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest2 = build.assertSentFetchRequest();
        Assertions.assertEquals(1, assertSentFetchRequest2.destinationId());
        build.assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        build.deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), build.fetchResponse(5, 1, build.buildBatch(0L, 3, Arrays.asList("a", "b", "c")), 0L, Errors.NONE));
        build.client.poll();
        Assertions.assertEquals(3L, build.log.endOffset().offset);
        Assertions.assertEquals(3, build.log.lastFetchedEpoch());
    }

    private static KafkaMetric getMetric(Metrics metrics, String str) {
        return (KafkaMetric) metrics.metrics().get(metrics.metricName(str, "raft-metrics"));
    }
}
