package org.apache.kafka.raft;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClientTest.class */
public class KafkaRaftClientTest {
    private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
    private final int localId = 0;
    private final int electionTimeoutMs = 10000;
    private final int electionBackoffMaxMs = 100;
    private final int fetchTimeoutMs = 50000;
    private final int retryBackoffMs = 50;
    private final int requestTimeoutMs = 5000;
    private final int fetchMaxWaitMs = 0;
    private final MockTime time = new MockTime();
    private final MockLog log = new MockLog(METADATA_PARTITION);
    private final MockNetworkChannel channel = new MockNetworkChannel();
    private final Random random = (Random) Mockito.spy(new Random(1));
    private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();

    @AfterEach
    public void cleanUp() throws IOException {
        this.quorumStateStore.clear();
    }

    private InetSocketAddress mockAddress(int i) {
        return new InetSocketAddress("localhost", 9990 + i);
    }

    private KafkaRaftClient buildClient(Set<Integer> set) throws IOException {
        return buildClient(set, new Metrics(this.time));
    }

    private KafkaRaftClient buildClient(Set<Integer> set, Metrics metrics) throws IOException {
        LogContext logContext = new LogContext();
        KafkaRaftClient kafkaRaftClient = new KafkaRaftClient(this.channel, this.log, new QuorumState(0, set, 10000, 50000, this.quorumStateStore, this.time, logContext, this.random), this.time, metrics, new MockFuturePurgatory(this.time), new MockFuturePurgatory(this.time), (Map) set.stream().collect(Collectors.toMap(Function.identity(), (v1) -> {
            return mockAddress(v1);
        })), 100, 50, 5000, 0, logContext, this.random);
        kafkaRaftClient.initialize();
        return kafkaRaftClient;
    }

    @Test
    public void testInitializeSingleMemberQuorum() throws IOException {
        buildClient(Collections.singleton(0));
        Assertions.assertEquals(ElectionState.withElectedLeader(1, 0, Collections.singleton(0)), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
        Set<Integer> singleton = Collections.singleton(0);
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 0, singleton));
        KafkaRaftClient buildClient = buildClient(singleton);
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        Assertions.assertEquals(2 + 1, this.log.lastFetchedEpoch());
        Assertions.assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 2 + 1), buildClient.currentLeaderAndEpoch());
        Assertions.assertEquals(ElectionState.withElectedLeader(2 + 1, 0, singleton), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testInitializeAsLeaderFromStateStore() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        ((Random) Mockito.doReturn(0).when(this.random)).nextInt(10000);
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(0L, this.log.endOffset().offset);
        Assertions.assertEquals(ElectionState.withUnknownLeader(2, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(10000L);
        pollUntilSend(buildClient);
        assertSentVoteRequest(2 + 1, 0, 0L);
    }

    @Test
    public void testInitializeAsCandidateFromStateStore() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 2});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(0L, this.log.endOffset().offset);
        buildClient.poll();
        Assertions.assertEquals(2, collectVoteRequests(2, 0, 0L).size());
    }

    @Test
    public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withUnknownLeader(0, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(20000L);
        pollUntilSend(buildClient);
        Assertions.assertEquals(ElectionState.withVotedCandidate(1, 0, mkSet), this.quorumStateStore.readElectionState());
        deliverResponse(assertSentVoteRequest(1, 0, 0L), 1, voteResponse(true, Optional.empty(), 1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(1, 0, mkSet), this.quorumStateStore.readElectionState());
        long milliseconds = this.time.milliseconds();
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        buildClient.poll();
        assertSentBeginQuorumEpochRequest(1);
        RecordBatch recordBatch = (RecordBatch) this.log.read(0L, Isolation.UNCOMMITTED).records.batches().iterator().next();
        Assertions.assertTrue(recordBatch.isControlBatch());
        Record record = (Record) recordBatch.iterator().next();
        Assertions.assertEquals(milliseconds, record.timestamp());
        verifyLeaderChangeMessage(0, Collections.singletonList(1), record.key(), record.value());
    }

    @Test
    public void testHandleBeginQuorumRequest() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(beginEpochRequest(2, 1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(2, 1, mkSet), this.quorumStateStore.readElectionState());
        assertSentBeginQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
    }

    @Test
    public void testHandleBeginQuorumResponse() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(beginEpochRequest(2 + 1, 1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(2 + 1, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testEndQuorumIgnoredIfAlreadyCandidate() throws Exception {
        ((Random) Mockito.doReturn(85).when(this.random)).nextInt(Mockito.anyInt());
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(endEpochRequest(2, OptionalInt.empty(), 1, Collections.singletonList(0)));
        buildClient.poll();
        assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.empty());
        this.time.sleep((10000 + 85) - 1);
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(2, 0, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(1L);
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(2, 0, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(100L);
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(2 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testEndQuorumIgnoredIfAlreadyLeader() throws Exception {
        ((Random) Mockito.doReturn(85).when(this.random)).nextInt(Mockito.anyInt());
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 2});
        KafkaRaftClient initializeAsLeader = initializeAsLeader(mkSet, 2);
        deliverRequest(endEpochRequest(2, OptionalInt.empty(), 1, Arrays.asList(0, 2)));
        initializeAsLeader.poll();
        assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(0));
        this.time.sleep(49999L);
        initializeAsLeader.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(2, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandidate() throws Exception {
        ((Random) Mockito.doReturn(85).when(this.random)).nextInt(Mockito.anyInt());
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(endEpochRequest(2, OptionalInt.empty(), 1, Collections.singletonList(0)));
        buildClient.poll();
        assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.empty());
        this.time.sleep(100L);
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(2 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached() throws Exception {
        ((Random) Mockito.doReturn(85).when(this.random)).nextInt(Mockito.anyInt());
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 2});
        this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(2, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(endEpochRequest(2, OptionalInt.of(1), 1, Arrays.asList(0, 2)));
        buildClient.poll();
        assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(2 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testLocalReadFromLeader() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 2);
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.highWatermark());
        deliverRequest(fetchRequest(2, 1, 1L, 2, 0));
        initializeAsLeader.poll();
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.highWatermark());
        assertSentFetchResponse(Errors.NONE, 2, OptionalInt.of(0));
        SimpleRecord[] simpleRecordArr = {new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())};
        initializeAsLeader.append(MemoryRecords.withRecords(CompressionType.NONE, simpleRecordArr), AckMode.LEADER, 2147483647L);
        initializeAsLeader.poll();
        Assertions.assertEquals(3L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.highWatermark());
        validateLocalRead(initializeAsLeader, new OffsetAndEpoch(1L, 2), Isolation.COMMITTED, new SimpleRecord[0]);
        validateLocalRead(initializeAsLeader, new OffsetAndEpoch(1L, 2), Isolation.UNCOMMITTED, simpleRecordArr);
        validateLocalRead(initializeAsLeader, new OffsetAndEpoch(3L, 2), Isolation.COMMITTED, new SimpleRecord[0]);
        validateLocalRead(initializeAsLeader, new OffsetAndEpoch(3L, 2), Isolation.UNCOMMITTED, new SimpleRecord[0]);
        deliverRequest(fetchRequest(2, 1, 3L, 2, 0));
        initializeAsLeader.poll();
        Assertions.assertEquals(OptionalLong.of(3L), initializeAsLeader.highWatermark());
        assertSentFetchResponse(Errors.NONE, 2, OptionalInt.of(0));
        validateLocalRead(initializeAsLeader, new OffsetAndEpoch(1L, 2), Isolation.COMMITTED, simpleRecordArr);
    }

    @Test
    public void testDelayedLocalReadFromLeader() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 2);
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.highWatermark());
        deliverRequest(fetchRequest(2, 1, 1L, 2, 0));
        initializeAsLeader.poll();
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.highWatermark());
        assertSentFetchResponse(Errors.NONE, 2, OptionalInt.of(0));
        CompletableFuture read = initializeAsLeader.read(new OffsetAndEpoch(1L, 2), Isolation.UNCOMMITTED, 500L);
        Assertions.assertFalse(read.isDone());
        CompletableFuture read2 = initializeAsLeader.read(new OffsetAndEpoch(1L, 2), Isolation.COMMITTED, 500L);
        Assertions.assertFalse(read.isDone());
        SimpleRecord[] simpleRecordArr = {new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())};
        initializeAsLeader.append(MemoryRecords.withRecords(CompressionType.NONE, simpleRecordArr), AckMode.LEADER, 2147483647L);
        initializeAsLeader.poll();
        Assertions.assertEquals(3L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.highWatermark());
        Assertions.assertTrue(read.isDone());
        assertMatchingRecords(simpleRecordArr, (Records) read.get());
        Assertions.assertFalse(read2.isDone());
        deliverRequest(fetchRequest(2, 1, 3L, 2, 0));
        initializeAsLeader.poll();
        Assertions.assertEquals(OptionalLong.of(3L), initializeAsLeader.highWatermark());
        assertSentFetchResponse(Errors.NONE, 2, OptionalInt.of(0));
        Assertions.assertTrue(read2.isDone());
        assertMatchingRecords(simpleRecordArr, (Records) read2.get());
    }

    @Test
    public void testLocalReadFromFollower() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        SimpleRecord[] simpleRecordArr = {new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())};
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(0L, 0), simpleRecordArr, 2L);
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(2L), buildClient.highWatermark());
        validateLocalRead(buildClient, new OffsetAndEpoch(0L, 0), Isolation.COMMITTED, simpleRecordArr);
        validateLocalRead(buildClient, new OffsetAndEpoch(0L, 0), Isolation.UNCOMMITTED, simpleRecordArr);
        validateLocalRead(buildClient, new OffsetAndEpoch(2L, 2), Isolation.COMMITTED, new SimpleRecord[0]);
        validateLocalRead(buildClient, new OffsetAndEpoch(2L, 2), Isolation.UNCOMMITTED, new SimpleRecord[0]);
        SimpleRecord[] simpleRecordArr2 = {new SimpleRecord("c".getBytes()), new SimpleRecord("d".getBytes()), new SimpleRecord("e".getBytes())};
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(2L, 2), simpleRecordArr2, 2L);
        Assertions.assertEquals(5L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(2L), buildClient.highWatermark());
        validateLocalRead(buildClient, new OffsetAndEpoch(2L, 2), Isolation.COMMITTED, new SimpleRecord[0]);
        validateLocalRead(buildClient, new OffsetAndEpoch(2L, 2), Isolation.UNCOMMITTED, simpleRecordArr2);
        validateLocalRead(buildClient, new OffsetAndEpoch(5L, 2), Isolation.COMMITTED, new SimpleRecord[0]);
        validateLocalRead(buildClient, new OffsetAndEpoch(5L, 2), Isolation.UNCOMMITTED, new SimpleRecord[0]);
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(5L, 2), new SimpleRecord[0], 5L);
        Assertions.assertEquals(5L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(5L), buildClient.highWatermark());
        validateLocalRead(buildClient, new OffsetAndEpoch(2L, 2), Isolation.COMMITTED, simpleRecordArr2);
        validateLocalRead(buildClient, new OffsetAndEpoch(5L, 2), Isolation.COMMITTED, new SimpleRecord[0]);
    }

    @Test
    public void testDelayedLocalReadFromFollowerToHighWatermark() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        SimpleRecord[] simpleRecordArr = {new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())};
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(0L, 0), simpleRecordArr, 0L);
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(0L), buildClient.highWatermark());
        CompletableFuture read = buildClient.read(new OffsetAndEpoch(0L, 0), Isolation.COMMITTED, 500L);
        Assertions.assertFalse(read.isDone());
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(2L, 2), new SimpleRecord[0], 2L);
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(2L), buildClient.highWatermark());
        Assertions.assertTrue(read.isDone());
        assertMatchingRecords(simpleRecordArr, (Records) read.get());
    }

    @Test
    public void testDelayedLocalReadFromFollowerToHighWatermarkTimeout() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(0L, 0), new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}, 0L);
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(0L), buildClient.highWatermark());
        CompletableFuture read = buildClient.read(new OffsetAndEpoch(0L, 0), Isolation.COMMITTED, 500L);
        Assertions.assertFalse(read.isDone());
        this.time.sleep(500L);
        buildClient.poll();
        Assertions.assertTrue(read.isDone());
        TestUtils.assertFutureThrows(read, TimeoutException.class);
    }

    @Test
    public void testLocalReadLogTruncationError() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        fetchFromLeader(buildClient, 1, 2, new OffsetAndEpoch(0L, 0), new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}, 2L);
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(2L), buildClient.highWatermark());
        CompletableFuture read = buildClient.read(new OffsetAndEpoch(1L, 1), Isolation.COMMITTED, 0L);
        Assertions.assertTrue(read.isDone());
        TestUtils.assertFutureThrows(read, LogTruncationException.class);
    }

    @Test
    public void testDelayedLocalReadLogTruncationErrorAfterUncleanElection() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 2);
        initializeAsLeader.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}), AckMode.LEADER, 2147483647L);
        initializeAsLeader.poll();
        Assertions.assertEquals(3L, this.log.endOffset().offset);
        deliverRequest(beginEpochRequest(3, 1));
        initializeAsLeader.poll();
        assertSentBeginQuorumEpochResponse(Errors.NONE, 3, OptionalInt.of(1));
        CompletableFuture read = initializeAsLeader.read(new OffsetAndEpoch(3L, 2), Isolation.UNCOMMITTED, 500L);
        Assertions.assertFalse(read.isDone());
        pollUntilSend(initializeAsLeader);
        deliverResponse(assertSentFetchRequest(3, 3L, 2), 1, outOfRangeFetchRecordsResponse(3, 1, 1L, 2, 0L));
        initializeAsLeader.poll();
        Assertions.assertEquals(1L, this.log.endOffset().offset);
        Assertions.assertTrue(read.isDone());
        TestUtils.assertFutureThrows(read, LogTruncationException.class);
    }

    private void validateLocalRead(KafkaRaftClient kafkaRaftClient, OffsetAndEpoch offsetAndEpoch, Isolation isolation, SimpleRecord[] simpleRecordArr) throws Exception {
        CompletableFuture read = kafkaRaftClient.read(offsetAndEpoch, isolation, 0L);
        Assertions.assertTrue(read.isDone());
        assertMatchingRecords(simpleRecordArr, (Records) read.get());
    }

    private void assertMatchingRecords(SimpleRecord[] simpleRecordArr, Records records) {
        List list = Utils.toList(records.records());
        Assertions.assertEquals(simpleRecordArr.length, list.size());
        for (int i = 0; i < simpleRecordArr.length; i++) {
            Record record = (Record) list.get(i);
            Assertions.assertEquals(simpleRecordArr[i], new SimpleRecord(record), "Record at offset " + record.offset() + " does not match expected");
        }
    }

    private void fetchFromLeader(KafkaRaftClient kafkaRaftClient, int i, int i2, OffsetAndEpoch offsetAndEpoch, SimpleRecord[] simpleRecordArr, long j) throws Exception {
        pollUntilSend(kafkaRaftClient);
        deliverResponse(assertSentFetchRequest(i2, offsetAndEpoch.offset, offsetAndEpoch.epoch), i, fetchResponse(i2, i, MemoryRecords.withRecords(offsetAndEpoch.offset, CompressionType.NONE, Integer.valueOf(i2), simpleRecordArr), j, Errors.NONE));
        kafkaRaftClient.poll();
    }

    @Test
    public void testHandleEndQuorumRequest() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(endEpochRequest(2, OptionalInt.of(1), 1, Collections.singletonList(0)));
        buildClient.poll();
        assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(2 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testHandleEndQuorumRequestWithLowerPriorityToBecomeLeader() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 3});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(endEpochRequest(2, OptionalInt.of(1), 1, Arrays.asList(3, 0)));
        pollUntilSend(buildClient);
        assertSentEndQuorumEpochResponse(Errors.NONE, 2, OptionalInt.of(1));
        this.time.sleep(1L);
        pollUntilSend(buildClient);
        assertSentFetchRequest(2, 0L, 0);
        this.time.sleep(50L);
        pollUntilSend(buildClient);
        Assertions.assertEquals(2, collectVoteRequests(2 + 1, 0, 0L).size());
        Assertions.assertEquals(ElectionState.withVotedCandidate(2 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testVoteRequestTimeout() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withUnknownLeader(0, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(20000L);
        pollUntilSend(buildClient);
        Assertions.assertEquals(ElectionState.withVotedCandidate(1, 0, mkSet), this.quorumStateStore.readElectionState());
        int assertSentVoteRequest = assertSentVoteRequest(1, 0, 0L);
        this.time.sleep(5000L);
        buildClient.poll();
        int assertSentVoteRequest2 = assertSentVoteRequest(1, 0, 0L);
        deliverResponse(assertSentVoteRequest, 1, voteResponse(true, Optional.empty(), 1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(1, 0, mkSet), this.quorumStateStore.readElectionState());
        deliverResponse(assertSentVoteRequest2, 1, voteResponse(true, Optional.empty(), 1));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testHandleValidVoteRequestAsFollower() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(2, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(voteRequest(2, 1, 2 - 1, 1L));
        buildClient.poll();
        assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), true);
        Assertions.assertEquals(ElectionState.withVotedCandidate(2, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testHandleVoteRequestAsFollowerWithElectedLeader() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 3});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(2, 3, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(voteRequest(2, 1, 2 - 1, 1L));
        buildClient.poll();
        assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(3), false);
        Assertions.assertEquals(ElectionState.withElectedLeader(2, 3, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testHandleVoteRequestAsFollowerWithVotedCandidate() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 3});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 3, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(voteRequest(2, 1, 2 - 1, 1L));
        buildClient.poll();
        assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
        Assertions.assertEquals(ElectionState.withVotedCandidate(2, 3, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testHandleInvalidVoteRequestWithOlderEpoch() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(2, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(voteRequest(2 - 1, 1, 2 - 2, 1L));
        buildClient.poll();
        assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, 2, OptionalInt.empty(), false);
        Assertions.assertEquals(ElectionState.withUnknownLeader(2, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testHandleInvalidVoteRequestAsObserver() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{1, 2});
        this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(2, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        deliverRequest(voteRequest(2 + 1, 1, 2, 1L));
        buildClient.poll();
        assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, 2, OptionalInt.empty(), false);
        Assertions.assertEquals(ElectionState.withUnknownLeader(2, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testLeaderIgnoreVoteRequestOnSameEpoch() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        KafkaRaftClient initializeAsLeader = initializeAsLeader(mkSet, 2);
        deliverRequest(voteRequest(2, 1, 2 - 1, 1L));
        initializeAsLeader.poll();
        assertSentVoteResponse(Errors.NONE, 2, OptionalInt.of(0), false);
        Assertions.assertEquals(ElectionState.withElectedLeader(2, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testStateMachineApplyCommittedRecords() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.poll();
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.highWatermark());
        deliverRequest(fetchRequest(5, 1, 0L, 5, 500));
        pollUntilSend(initializeAsLeader);
        Assertions.assertEquals(OptionalLong.of(0L), initializeAsLeader.highWatermark());
        CompletableFuture append = initializeAsLeader.append(MemoryRecords.withRecords(0L, CompressionType.NONE, 1, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())}), AckMode.LEADER, 2147483647L);
        initializeAsLeader.poll();
        Assertions.assertTrue(append.isDone());
        Assertions.assertEquals(new OffsetAndEpoch(3L, 5), append.get());
        deliverRequest(fetchRequest(5, 1, 1L, 5, 500));
        pollUntilSend(initializeAsLeader);
        Assertions.assertEquals(OptionalLong.of(1L), initializeAsLeader.highWatermark());
        deliverRequest(fetchRequest(5, 1, 4L, 5, 500));
        initializeAsLeader.poll();
        Assertions.assertEquals(OptionalLong.of(4L), initializeAsLeader.highWatermark());
        CompletableFuture append2 = initializeAsLeader.append(MemoryRecords.withRecords(0L, CompressionType.NONE, 1, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())}), AckMode.QUORUM, 2147483647L);
        initializeAsLeader.poll();
        Assertions.assertFalse(append2.isDone());
        deliverRequest(fetchRequest(5, 1, 4L, 5, 500));
        pollUntilSend(initializeAsLeader);
        Assertions.assertFalse(append2.isDone());
        Assertions.assertEquals(OptionalLong.of(4L), initializeAsLeader.highWatermark());
        deliverRequest(fetchRequest(5, 1, 7L, 5, 500));
        initializeAsLeader.poll();
        Assertions.assertEquals(OptionalLong.of(7L), initializeAsLeader.highWatermark());
        Assertions.assertTrue(append2.isDone());
        Assertions.assertEquals(new OffsetAndEpoch(6L, 5), append2.get());
    }

    @Test
    public void testStateMachineExpireAppendedRecords() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 5);
        initializeAsLeader.poll();
        Assertions.assertEquals(OptionalLong.empty(), initializeAsLeader.highWatermark());
        deliverRequest(fetchRequest(5, 1, 0L, 5, 500));
        pollUntilSend(initializeAsLeader);
        Assertions.assertEquals(OptionalLong.of(0L), initializeAsLeader.highWatermark());
        CompletableFuture append = initializeAsLeader.append(MemoryRecords.withRecords(0L, CompressionType.NONE, 1, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())}), AckMode.QUORUM, 5000L);
        initializeAsLeader.poll();
        Assertions.assertFalse(append.isDone());
        this.time.sleep(5000 - 1);
        Assertions.assertFalse(append.isDone());
        this.time.sleep(1L);
        Assertions.assertTrue(append.isCompletedExceptionally());
    }

    @Test
    public void testCandidateIgnoreVoteRequestOnSameEpoch() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        pollUntilSend(buildClient);
        deliverRequest(voteRequest(2, 1, 2 - 1, 1L));
        buildClient.poll();
        assertSentVoteResponse(Errors.NONE, 2, OptionalInt.empty(), false);
        Assertions.assertEquals(ElectionState.withVotedCandidate(2, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testRetryElection() throws Exception {
        ((Random) Mockito.doReturn(85).when(this.random)).nextInt(Mockito.anyInt());
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withUnknownLeader(0, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(20000L);
        pollUntilSend(buildClient);
        Assertions.assertEquals(ElectionState.withVotedCandidate(1, 0, mkSet), this.quorumStateStore.readElectionState());
        deliverResponse(assertSentVoteRequest(1, 0, 0L), 1, voteResponse(false, Optional.empty(), 1));
        buildClient.poll();
        ElectionState readElectionState = this.quorumStateStore.readElectionState();
        Assertions.assertEquals(1, readElectionState.epoch);
        Assertions.assertTrue(readElectionState.hasVoted());
        Assertions.assertEquals(0, readElectionState.votedId());
        this.time.sleep(99L);
        buildClient.poll();
        Assertions.assertEquals(1, this.quorumStateStore.readElectionState().epoch);
        this.time.sleep(1L);
        buildClient.poll();
        pollUntilSend(buildClient);
        Assertions.assertEquals(ElectionState.withVotedCandidate(1 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
        assertSentVoteRequest(1 + 1, 0, 0L);
    }

    @Test
    public void testInitializeAsFollowerEmptyLog() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        assertSentFetchRequest(5, 0L, 0);
    }

    @Test
    public void testInitializeAsFollowerNonEmptyLog() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        this.log.appendAsLeader(Collections.singleton(new SimpleRecord("foo".getBytes())), 3);
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        assertSentFetchRequest(5, 1L, 3);
    }

    @Test
    public void testVoterBecomeCandidateAfterFetchTimeout() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        this.log.appendAsLeader(Collections.singleton(new SimpleRecord("foo".getBytes())), 3);
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        assertSentFetchRequest(5, 1L, 3);
        this.time.sleep(50000L);
        pollUntilSend(buildClient);
        assertSentVoteRequest(5 + 1, 3, 1L);
        Assertions.assertEquals(ElectionState.withVotedCandidate(5 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testInitializeObserverNoPreviousState() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{1, 2});
        KafkaRaftClient buildClient = buildClient(mkSet);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testObserverQuorumDiscoveryFailure() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{1});
        KafkaRaftClient buildClient = buildClient(mkSet);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), fetchResponse(-1, -1, MemoryRecords.EMPTY, -1L, Errors.UNKNOWN_SERVER_ERROR));
        buildClient.poll();
        this.time.sleep(50L);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest2 = assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        assertFetchRequestData(assertSentFetchRequest2, 0, 0L, 0);
        deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testObserverSendDiscoveryFetchAfterFetchTimeout() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{1, 2});
        KafkaRaftClient buildClient = buildClient(mkSet);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        this.time.sleep(50000L);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest2 = assertSentFetchRequest();
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
    }

    @Test
    public void testInvalidFetchRequest() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 5);
        deliverRequest(fetchRequest(5, 1, -5L, 0, 0));
        initializeAsLeader.poll();
        assertSentFetchResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        deliverRequest(fetchRequest(5, 1, 0L, -1, 0));
        initializeAsLeader.poll();
        assertSentFetchResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        deliverRequest(fetchRequest(5, 1, 0L, 5 + 1, 0));
        initializeAsLeader.poll();
        assertSentFetchResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
        deliverRequest(fetchRequest(5 + 1, 1, 0L, 0, 0));
        initializeAsLeader.poll();
        assertSentFetchResponse(Errors.UNKNOWN_LEADER_EPOCH, 5, OptionalInt.of(0));
        deliverRequest(fetchRequest(5, 1, 0L, 0, -1));
        initializeAsLeader.poll();
        assertSentFetchResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(0));
    }

    @Test
    public void testVoterOnlyRequestValidation() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 5);
        deliverRequest(voteRequest(5, 2, 0, 0L));
        initializeAsLeader.poll();
        assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, 5, OptionalInt.of(0), false);
        deliverRequest(beginEpochRequest(5, 2));
        initializeAsLeader.poll();
        assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, 5, OptionalInt.of(0));
        deliverRequest(endEpochRequest(5, OptionalInt.of(0), 2, Collections.singletonList(1)));
        initializeAsLeader.poll();
        assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, 5, OptionalInt.of(0));
    }

    @Test
    public void testInvalidVoteRequest() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        deliverRequest(voteRequest(5 + 1, 1, 0, -5L));
        buildClient.poll();
        assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(1), false);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        deliverRequest(voteRequest(5 + 1, 1, -1, 0L));
        buildClient.poll();
        assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(1), false);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        deliverRequest(voteRequest(5 + 1, 1, 5 + 1, 0L));
        buildClient.poll();
        assertSentVoteResponse(Errors.INVALID_REQUEST, 5, OptionalInt.of(1), false);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testPurgatoryFetchTimeout() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 5);
        deliverRequest(fetchRequest(5, 1, 1L, 5, 500));
        initializeAsLeader.poll();
        Assertions.assertEquals(0, this.channel.drainSendQueue().size());
        this.time.sleep(500);
        initializeAsLeader.poll();
        Assertions.assertEquals(0, assertSentFetchResponse(Errors.NONE, 5, OptionalInt.of(0)).sizeInBytes());
    }

    @Test
    public void testPurgatoryFetchSatisfiedByWrite() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 5);
        deliverRequest(fetchRequest(5, 1, 1L, 5, 500));
        initializeAsLeader.poll();
        Assertions.assertEquals(0, this.channel.drainSendQueue().size());
        SimpleRecord[] simpleRecordArr = {new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())};
        CompletableFuture append = initializeAsLeader.append(MemoryRecords.withRecords(0L, CompressionType.NONE, 1, simpleRecordArr), AckMode.LEADER, 2147483647L);
        initializeAsLeader.poll();
        Assertions.assertTrue(append.isDone());
        List list = Utils.toList(assertSentFetchResponse(Errors.NONE, 5, OptionalInt.of(0)).records());
        Assertions.assertEquals(simpleRecordArr.length, list.size());
        for (int i = 0; i < simpleRecordArr.length; i++) {
            Assertions.assertEquals(simpleRecordArr[i], new SimpleRecord((Record) list.get(i)));
        }
    }

    @Test
    public void testPurgatoryFetchCompletedByFollowerTransition() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 2});
        KafkaRaftClient initializeAsLeader = initializeAsLeader(mkSet, 5);
        deliverRequest(fetchRequest(5, 1, 1L, 5, 500));
        initializeAsLeader.poll();
        Assertions.assertTrue(this.channel.drainSendQueue().stream().noneMatch(raftMessage -> {
            return raftMessage.data() instanceof FetchResponseData;
        }));
        deliverRequest(beginEpochRequest(5 + 1, 2));
        initializeAsLeader.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5 + 1, 2, mkSet), this.quorumStateStore.readElectionState());
        assertSentBeginQuorumEpochResponse(Errors.NONE, 5 + 1, OptionalInt.of(2));
        Assertions.assertEquals(0, assertSentFetchResponse(Errors.NOT_LEADER_OR_FOLLOWER, 5 + 1, OptionalInt.of(2)).sizeInBytes());
    }

    private void expectLeaderElection(KafkaRaftClient kafkaRaftClient, Set<Integer> set, int i) throws Exception {
        pollUntilSend(kafkaRaftClient);
        for (RaftRequest.Outbound outbound : collectVoteRequests(i, this.log.lastFetchedEpoch(), this.log.endOffset().offset)) {
            deliverResponse(outbound.correlationId, outbound.destinationId(), voteResponse(true, Optional.empty(), i));
        }
        kafkaRaftClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(i, 0, set), this.quorumStateStore.readElectionState());
    }

    private KafkaRaftClient initializeAsLeader(Set<Integer> set, int i) throws Exception {
        if (i <= 0) {
            throw new IllegalArgumentException("Cannot become leader in epoch " + i);
        }
        ((Random) Mockito.doReturn(0).when(this.random)).nextInt(10000);
        ElectionState withUnknownLeader = ElectionState.withUnknownLeader(i - 1, set);
        this.quorumStateStore.writeElectionState(withUnknownLeader);
        KafkaRaftClient buildClient = buildClient(set);
        Assertions.assertEquals(withUnknownLeader, this.quorumStateStore.readElectionState());
        this.time.sleep(10000L);
        expectLeaderElection(buildClient, set, i);
        pollUntilSend(buildClient);
        for (RaftRequest.Outbound outbound : collectBeginEpochRequests(i)) {
            deliverResponse(outbound.correlationId, outbound.destinationId(), beginEpochResponse(i, 0));
        }
        buildClient.poll();
        return buildClient;
    }

    @Test
    public void testFetchResponseIgnoredAfterBecomingCandidate() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        int assertSentFetchRequest = assertSentFetchRequest(5, 0L, 0);
        this.time.sleep(50000L);
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withVotedCandidate(5 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
        deliverResponse(assertSentFetchRequest, 1, fetchResponse(5, 1, MemoryRecords.withRecords(0L, CompressionType.NONE, 3, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}), 0L, Errors.NONE));
        buildClient.poll();
        Assertions.assertEquals(0L, this.log.endOffset().offset);
        Assertions.assertEquals(ElectionState.withVotedCandidate(5 + 1, 0, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testFetchResponseIgnoredAfterBecomingFollowerOfDifferentLeader() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 2});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        int assertSentFetchRequest = assertSentFetchRequest(5, 0L, 0);
        deliverRequest(beginEpochRequest(5 + 1, 2));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5 + 1, 2, mkSet), this.quorumStateStore.readElectionState());
        deliverResponse(assertSentFetchRequest, 1, fetchResponse(5, 1, MemoryRecords.withRecords(0L, CompressionType.NONE, 3, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}), 0L, Errors.NONE));
        buildClient.poll();
        Assertions.assertEquals(0L, this.log.endOffset().offset);
        Assertions.assertEquals(ElectionState.withElectedLeader(5 + 1, 2, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1, 2});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(5, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withVotedCandidate(5, 0, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        List<RaftRequest.Outbound> collectVoteRequests = collectVoteRequests(5, 0, 0L);
        Assertions.assertEquals(2, collectVoteRequests.size());
        deliverRequest(beginEpochRequest(5, 2));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 2, mkSet), this.quorumStateStore.readElectionState());
        deliverResponse(collectVoteRequests.get(0).correlationId, 1, voteResponse(false, Optional.empty(), 5));
        deliverResponse(collectVoteRequests.get(1).correlationId, 2, voteResponse(false, Optional.of(2), 5));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 2, mkSet), this.quorumStateStore.readElectionState());
    }

    private void discoverLeaderAsObserver(KafkaRaftClient kafkaRaftClient, Set<Integer> set, int i, int i2) throws Exception {
        pollUntilSend(kafkaRaftClient);
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertTrue(set.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), fetchResponse(i2, i, MemoryRecords.EMPTY, 0L, Errors.NONE));
        kafkaRaftClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(i2, i, set), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testObserverLeaderRediscoveryAfterBrokerNotAvailableError() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{1, 2});
        KafkaRaftClient buildClient = buildClient(mkSet);
        discoverLeaderAsObserver(buildClient, mkSet, 1, 5);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertEquals(1, assertSentFetchRequest.destinationId());
        assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), fetchResponse(5, -1, MemoryRecords.EMPTY, -1L, Errors.BROKER_NOT_AVAILABLE));
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest2 = assertSentFetchRequest();
        Assertions.assertNotEquals(1, assertSentFetchRequest2.destinationId());
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, assertSentFetchRequest2.destinationId() == 1 ? Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{1, 2});
        KafkaRaftClient buildClient = buildClient(mkSet);
        discoverLeaderAsObserver(buildClient, mkSet, 1, 5);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertEquals(1, assertSentFetchRequest.destinationId());
        assertFetchRequestData(assertSentFetchRequest, 5, 0L, 0);
        this.time.sleep(5000L);
        pollUntilSend(buildClient);
        RaftRequest.Outbound assertSentFetchRequest2 = assertSentFetchRequest();
        Assertions.assertNotEquals(1, assertSentFetchRequest2.destinationId());
        Assertions.assertTrue(mkSet.contains(Integer.valueOf(assertSentFetchRequest2.destinationId())));
        assertFetchRequestData(assertSentFetchRequest2, 5, 0L, 0);
        deliverResponse(assertSentFetchRequest2.correlationId, assertSentFetchRequest2.destinationId(), fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
        buildClient.poll();
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
    }

    @Test
    public void testLeaderGracefulShutdown() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 1);
        CompletableFuture shutdown = initializeAsLeader.shutdown(5000);
        Assertions.assertTrue(initializeAsLeader.isShuttingDown());
        Assertions.assertTrue(initializeAsLeader.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        initializeAsLeader.poll();
        Assertions.assertTrue(initializeAsLeader.isShuttingDown());
        Assertions.assertTrue(initializeAsLeader.isRunning());
        assertSentEndQuorumEpochRequest(1, OptionalInt.of(0), 1);
        deliverRequest(voteRequest(1 + 1, 1, 1, 1L));
        initializeAsLeader.poll();
        assertSentVoteResponse(Errors.NONE, 1 + 1, OptionalInt.empty(), true);
        deliverRequest(beginEpochRequest(2, 1));
        TestUtils.waitForCondition(() -> {
            initializeAsLeader.poll();
            return !initializeAsLeader.isRunning();
        }, 5000L, "Client failed to shutdown before expiration of timeout");
        Assertions.assertFalse(initializeAsLeader.isShuttingDown());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @Test
    public void testEndQuorumEpochSentBasedOnFetchOffset() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 2, 1}), 1);
        buildFollowerSet(initializeAsLeader, 1, 2, 1);
        initializeAsLeader.shutdown(20000);
        Assertions.assertTrue(initializeAsLeader.isRunning());
        initializeAsLeader.poll();
        Assertions.assertTrue(initializeAsLeader.isRunning());
        Assertions.assertEquals(2, collectEndQuorumRequests(1, OptionalInt.of(0), Utils.mkSet(new Integer[]{2, 1})).size());
    }

    @Test
    public void testDescribeQuorum() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 2, 1}), 1);
        buildFollowerSet(initializeAsLeader, 1, 2, 1);
        deliverRequest(fetchRequest(1, 3, 0L, 0, 0));
        initializeAsLeader.poll();
        assertSentFetchResponse(1L, 1);
        deliverRequest(DescribeQuorumRequest.singletonRequest(METADATA_PARTITION));
        initializeAsLeader.poll();
        assertSentDescribeQuorumResponse(0, 1, 1L, Arrays.asList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(0).setLogEndOffset(3L), new DescribeQuorumResponseData.ReplicaState().setReplicaId(1).setLogEndOffset(0L), new DescribeQuorumResponseData.ReplicaState().setReplicaId(2).setLogEndOffset(1L)), Collections.singletonList(new DescribeQuorumResponseData.ReplicaState().setReplicaId(3).setLogEndOffset(0L)));
    }

    private void buildFollowerSet(KafkaRaftClient kafkaRaftClient, int i, int i2, int i3) throws Exception {
        deliverRequest(fetchRequest(1, i3, 0L, 0, 0));
        kafkaRaftClient.poll();
        assertSentFetchResponse(0L, i);
        kafkaRaftClient.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord("foo".getBytes()), new SimpleRecord("bar".getBytes())}), AckMode.LEADER, 2147483647L);
        kafkaRaftClient.poll();
        deliverRequest(fetchRequest(i, i2, 1L, i, 0));
        kafkaRaftClient.poll();
        assertSentFetchResponse(1L, i);
    }

    @Test
    public void testLeaderGracefulShutdownTimeout() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 1);
        CompletableFuture shutdown = initializeAsLeader.shutdown(5000);
        Assertions.assertTrue(initializeAsLeader.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        initializeAsLeader.poll();
        Assertions.assertTrue(initializeAsLeader.isRunning());
        assertSentEndQuorumEpochRequest(1, OptionalInt.of(0), 1);
        this.time.sleep(5000);
        initializeAsLeader.poll();
        Assertions.assertFalse(initializeAsLeader.isRunning());
        Assertions.assertTrue(shutdown.isCompletedExceptionally());
        TestUtils.assertFutureThrows(shutdown, java.util.concurrent.TimeoutException.class);
    }

    @Test
    public void testFollowerGracefulShutdown() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        buildClient.poll();
        CompletableFuture shutdown = buildClient.shutdown(5000);
        Assertions.assertTrue(buildClient.isRunning());
        Assertions.assertFalse(shutdown.isDone());
        buildClient.poll();
        Assertions.assertFalse(buildClient.isRunning());
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
    }

    @Test
    public void testGracefulShutdownSingleMemberQuorum() throws IOException {
        KafkaRaftClient buildClient = buildClient(Collections.singleton(0));
        Assertions.assertEquals(ElectionState.withElectedLeader(1, 0, Collections.singleton(0)), this.quorumStateStore.readElectionState());
        buildClient.poll();
        Assertions.assertEquals(0, this.channel.drainSendQueue().size());
        buildClient.shutdown(5000);
        Assertions.assertTrue(buildClient.isRunning());
        buildClient.poll();
        Assertions.assertFalse(buildClient.isRunning());
    }

    @Test
    public void testFollowerReplication() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        deliverResponse(assertSentFetchRequest(5, 0L, 0), 1, fetchResponse(5, 1, MemoryRecords.withRecords(0L, CompressionType.NONE, 3, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}), 0L, Errors.NONE));
        buildClient.poll();
        Assertions.assertEquals(2L, this.log.endOffset().offset);
    }

    @Test
    public void testEmptyRecordSetInFetchResponse() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        deliverResponse(assertSentFetchRequest(5, 0L, 0), 1, fetchResponse(5, 1, MemoryRecords.EMPTY, 0L, Errors.NONE));
        buildClient.poll();
        Assertions.assertEquals(0L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(0L), buildClient.highWatermark());
        pollUntilSend(buildClient);
        deliverResponse(assertSentFetchRequest(5, 0L, 0), 1, fetchResponse(5, 1, MemoryRecords.withRecords(0L, CompressionType.NONE, 5, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())}), 0L, Errors.NONE));
        buildClient.poll();
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(0L), buildClient.highWatermark());
        pollUntilSend(buildClient);
        deliverResponse(assertSentFetchRequest(5, 2L, 5), 1, fetchResponse(5, 1, MemoryRecords.EMPTY, 2L, Errors.NONE));
        buildClient.poll();
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        Assertions.assertEquals(OptionalLong.of(2L), buildClient.highWatermark());
    }

    @Test
    public void testAppendEmptyRecordSetNotAllowed() throws Exception {
        Set<Integer> singleton = Collections.singleton(0);
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 0, singleton));
        KafkaRaftClient buildClient = buildClient(singleton);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            buildClient.append(MemoryRecords.EMPTY, AckMode.LEADER, 2147483647L);
        });
    }

    @Test
    public void testAppendToNonLeaderFails() throws IOException {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        CompletableFuture append = buildClient.append(MemoryRecords.withRecords(0L, CompressionType.NONE, 1, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())}), AckMode.LEADER, 2147483647L);
        buildClient.poll();
        TestUtils.assertFutureThrows(append, NotLeaderOrFollowerException.class);
    }

    @Test
    public void testFetchShouldBeTreatedAsLeaderEndorsement() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        ((Random) Mockito.doReturn(0).when(this.random)).nextInt(10000);
        this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(5 - 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        this.time.sleep(10000L);
        expectLeaderElection(buildClient, mkSet, 5);
        pollUntilSend(buildClient);
        assertSentBeginQuorumEpochRequest(5);
        deliverRequest(fetchRequest(5, 1, 0L, 0, 500));
        buildClient.poll();
        assertSentFetchResponse(Errors.NONE, 5, OptionalInt.of(0));
        this.time.sleep(5000L);
        buildClient.poll();
        Assertions.assertEquals(0, this.channel.drainSendQueue().size());
    }

    @Test
    public void testLeaderAppendSingleMemberQuorum() throws IOException {
        long milliseconds = this.time.milliseconds();
        Set<Integer> singleton = Collections.singleton(0);
        KafkaRaftClient buildClient = buildClient(singleton);
        Assertions.assertEquals(ElectionState.withElectedLeader(1, 0, singleton), this.quorumStateStore.readElectionState());
        Assertions.assertEquals(OptionalLong.of(1L), buildClient.highWatermark());
        SimpleRecord[] simpleRecordArr = {new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())};
        MemoryRecords withRecords = MemoryRecords.withRecords(1L, CompressionType.NONE, 1, simpleRecordArr);
        buildClient.poll();
        Assertions.assertEquals(OptionalLong.of(1L), buildClient.highWatermark());
        buildClient.append(withRecords, AckMode.LEADER, 2147483647L);
        buildClient.poll();
        Assertions.assertEquals(OptionalLong.of(4L), buildClient.highWatermark());
        deliverRequest(fetchRequest(1, 1, 0L, 0, 500));
        buildClient.poll();
        List list = Utils.toList(assertSentFetchResponse(Errors.NONE, 1, OptionalInt.of(0)).batchIterator());
        Assertions.assertEquals(2, list.size());
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
        Assertions.assertTrue(mutableRecordBatch.isControlBatch());
        List list2 = Utils.toList(mutableRecordBatch.iterator());
        Assertions.assertEquals(1, list2.size());
        Record record = (Record) list2.get(0);
        Assertions.assertEquals(milliseconds, record.timestamp());
        verifyLeaderChangeMessage(0, Collections.emptyList(), record.key(), record.value());
        MutableRecordBatch mutableRecordBatch2 = (MutableRecordBatch) list.get(1);
        Assertions.assertEquals(1, mutableRecordBatch2.partitionLeaderEpoch());
        List list3 = Utils.toList(mutableRecordBatch2.iterator());
        Assertions.assertEquals(3, list3.size());
        for (int i = 0; i < simpleRecordArr.length; i++) {
            Assertions.assertEquals(simpleRecordArr[i].value(), ((Record) list3.get(i)).value());
        }
    }

    @Test
    public void testFollowerLogReconciliation() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        this.log.appendAsLeader(Arrays.asList(new SimpleRecord("foo".getBytes()), new SimpleRecord("bar".getBytes())), 3);
        this.log.appendAsLeader(Arrays.asList(new SimpleRecord("baz".getBytes())), 3);
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        Assertions.assertEquals(3L, this.log.endOffset().offset);
        pollUntilSend(buildClient);
        deliverResponse(assertSentFetchRequest(5, 3L, 3), 1, outOfRangeFetchRecordsResponse(5, 1, 2L, 3, 1L));
        buildClient.poll();
        Assertions.assertEquals(2L, this.log.endOffset().offset);
        buildClient.poll();
        assertSentFetchRequest(5, 2L, 3);
    }

    @Test
    public void testMetrics() throws Exception {
        Metrics metrics = new Metrics(this.time);
        KafkaRaftClient buildClient = buildClient(Collections.singleton(0), metrics);
        Assertions.assertNotNull(getMetric(metrics, "current-state"));
        Assertions.assertNotNull(getMetric(metrics, "current-leader"));
        Assertions.assertNotNull(getMetric(metrics, "current-vote"));
        Assertions.assertNotNull(getMetric(metrics, "current-epoch"));
        Assertions.assertNotNull(getMetric(metrics, "high-watermark"));
        Assertions.assertNotNull(getMetric(metrics, "log-end-offset"));
        Assertions.assertNotNull(getMetric(metrics, "log-end-epoch"));
        Assertions.assertNotNull(getMetric(metrics, "number-unknown-voter-connections"));
        Assertions.assertNotNull(getMetric(metrics, "poll-idle-ratio-avg"));
        Assertions.assertNotNull(getMetric(metrics, "commit-latency-avg"));
        Assertions.assertNotNull(getMetric(metrics, "commit-latency-max"));
        Assertions.assertNotNull(getMetric(metrics, "election-latency-avg"));
        Assertions.assertNotNull(getMetric(metrics, "election-latency-max"));
        Assertions.assertNotNull(getMetric(metrics, "fetch-records-rate"));
        Assertions.assertNotNull(getMetric(metrics, "append-records-rate"));
        Assertions.assertEquals("leader", getMetric(metrics, "current-state").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "current-leader").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric(metrics, "current-vote").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(metrics, "current-epoch").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "high-watermark").metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric(metrics, "log-end-offset").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(metrics, "log-end-epoch").metricValue());
        buildClient.append(MemoryRecords.withRecords(0L, CompressionType.NONE, 1, new SimpleRecord[]{new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()), new SimpleRecord("c".getBytes())}), AckMode.LEADER, 2147483647L);
        buildClient.poll();
        Assertions.assertEquals(Double.valueOf(4.0d), getMetric(metrics, "high-watermark").metricValue());
        Assertions.assertEquals(Double.valueOf(4.0d), getMetric(metrics, "log-end-offset").metricValue());
        Assertions.assertEquals(Double.valueOf(1), getMetric(metrics, "log-end-epoch").metricValue());
        CompletableFuture shutdown = buildClient.shutdown(100);
        buildClient.poll();
        Assertions.assertTrue(shutdown.isDone());
        Assertions.assertNull(shutdown.get());
        Assertions.assertEquals(1, metrics.metrics().size());
    }

    @Test
    public void testClusterAuthorizationFailedInFetch() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(5, 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withElectedLeader(5, 1, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        deliverResponse(assertSentFetchRequest(5, 0L, 0), 1, new FetchResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        buildClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, buildClient::poll);
    }

    @Test
    public void testClusterAuthorizationFailedInBeginQuorumEpoch() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        ((Random) Mockito.doReturn(0).when(this.random)).nextInt(10000);
        this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(5 - 1, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        this.time.sleep(10000L);
        expectLeaderElection(buildClient, mkSet, 5);
        pollUntilSend(buildClient);
        deliverResponse(assertSentBeginQuorumEpochRequest(5), 1, new BeginQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        buildClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, buildClient::poll);
    }

    @Test
    public void testClusterAuthorizationFailedInVote() throws Exception {
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, 1});
        this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(5, 0, mkSet));
        KafkaRaftClient buildClient = buildClient(mkSet);
        Assertions.assertEquals(ElectionState.withVotedCandidate(5, 0, mkSet), this.quorumStateStore.readElectionState());
        pollUntilSend(buildClient);
        deliverResponse(assertSentVoteRequest(5, 0, 0L), 1, new VoteResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        buildClient.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, buildClient::poll);
    }

    @Test
    public void testClusterAuthorizationFailedInEndQuorumEpoch() throws Exception {
        KafkaRaftClient initializeAsLeader = initializeAsLeader(Utils.mkSet(new Integer[]{0, 1}), 2);
        initializeAsLeader.shutdown(5000);
        pollUntilSend(initializeAsLeader);
        deliverResponse(assertSentEndQuorumEpochRequest(2, OptionalInt.of(0), 1), 1, new EndQuorumEpochResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()));
        initializeAsLeader.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, initializeAsLeader::poll);
    }

    private KafkaMetric getMetric(Metrics metrics, String str) {
        return (KafkaMetric) metrics.metrics().get(metrics.metricName(str, "raft-metrics"));
    }

    private void verifyLeaderChangeMessage(int i, List<Integer> list, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Assertions.assertEquals(ControlRecordType.LEADER_CHANGE, ControlRecordType.parse(byteBuffer));
        LeaderChangeMessage deserializeLeaderChangeMessage = ControlRecordUtils.deserializeLeaderChangeMessage(byteBuffer2);
        Assertions.assertEquals(i, deserializeLeaderChangeMessage.leaderId());
        Assertions.assertEquals(list.stream().map(num -> {
            return new LeaderChangeMessage.Voter().setVoterId(num.intValue());
        }).collect(Collectors.toList()), deserializeLeaderChangeMessage.voters());
    }

    private void assertSentVoteResponse(Errors errors, int i, OptionalInt optionalInt, boolean z) {
        List<RaftResponse.Outbound> drainSentResponses = this.channel.drainSentResponses(ApiKeys.VOTE);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof VoteResponseData);
        VoteResponseData data = raftMessage.data();
        Assertions.assertTrue(RaftUtil.hasValidTopicPartition(data, METADATA_PARTITION));
        VoteResponseData.PartitionData partitionData = (VoteResponseData.PartitionData) ((VoteResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(partitionData.voteGranted()));
        Assertions.assertEquals(errors, Errors.forCode(partitionData.errorCode()));
        Assertions.assertEquals(i, partitionData.leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
    }

    private void assertSentEndQuorumEpochResponse(Errors errors, int i, OptionalInt optionalInt) {
        List<RaftResponse.Outbound> drainSentResponses = this.channel.drainSentResponses(ApiKeys.END_QUORUM_EPOCH);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof EndQuorumEpochResponseData);
        EndQuorumEpochResponseData data = raftMessage.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        EndQuorumEpochResponseData.PartitionData partitionData = (EndQuorumEpochResponseData.PartitionData) ((EndQuorumEpochResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(i, partitionData.leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
        Assertions.assertEquals(errors, Errors.forCode(partitionData.errorCode()));
    }

    private FetchResponseData.FetchablePartitionResponse assertSentPartitionResponse() {
        List<RaftResponse.Outbound> drainSentResponses = this.channel.drainSentResponses(ApiKeys.FETCH);
        Assertions.assertEquals(1, drainSentResponses.size(), "Found unexpected sent messages " + drainSentResponses);
        RaftResponse.Outbound outbound = drainSentResponses.get(0);
        Assertions.assertEquals(ApiKeys.FETCH.id, outbound.data.apiKey());
        FetchResponseData data = outbound.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        Assertions.assertEquals(1, data.responses().size());
        Assertions.assertEquals(METADATA_PARTITION.topic(), ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).topic());
        Assertions.assertEquals(1, ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).partitionResponses().size());
        return (FetchResponseData.FetchablePartitionResponse) ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).partitionResponses().get(0);
    }

    private MemoryRecords assertSentFetchResponse(Errors errors, int i, OptionalInt optionalInt) {
        FetchResponseData.FetchablePartitionResponse assertSentPartitionResponse = assertSentPartitionResponse();
        Assertions.assertEquals(errors, Errors.forCode(assertSentPartitionResponse.errorCode()));
        Assertions.assertEquals(i, assertSentPartitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), assertSentPartitionResponse.currentLeader().leaderId());
        return assertSentPartitionResponse.recordSet();
    }

    private MemoryRecords assertSentFetchResponse(long j, int i) {
        FetchResponseData.FetchablePartitionResponse assertSentPartitionResponse = assertSentPartitionResponse();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(assertSentPartitionResponse.errorCode()));
        Assertions.assertEquals(i, assertSentPartitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals(j, assertSentPartitionResponse.highWatermark());
        return assertSentPartitionResponse.recordSet();
    }

    private void assertSentBeginQuorumEpochResponse(Errors errors, int i, OptionalInt optionalInt) {
        List<RaftResponse.Outbound> drainSentResponses = this.channel.drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof BeginQuorumEpochResponseData);
        BeginQuorumEpochResponseData data = raftMessage.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        BeginQuorumEpochResponseData.PartitionData partitionData = (BeginQuorumEpochResponseData.PartitionData) ((BeginQuorumEpochResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(i, partitionData.leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
        Assertions.assertEquals(errors, Errors.forCode(partitionData.errorCode()));
    }

    private int assertSentEndQuorumEpochRequest(int i, OptionalInt optionalInt, int i2) {
        List<RaftRequest.Outbound> collectEndQuorumRequests = collectEndQuorumRequests(i, optionalInt, Collections.singleton(Integer.valueOf(i2)));
        Assertions.assertEquals(1, collectEndQuorumRequests.size());
        return collectEndQuorumRequests.get(0).correlationId();
    }

    private List<RaftRequest.Outbound> collectEndQuorumRequests(int i, OptionalInt optionalInt, Set<Integer> set) {
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        Iterator<RaftMessage> it = this.channel.drainSendQueue().iterator();
        while (it.hasNext()) {
            RaftRequest.Outbound outbound = (RaftMessage) it.next();
            if (outbound.data() instanceof EndQuorumEpochRequestData) {
                EndQuorumEpochRequestData.PartitionData partitionData = (EndQuorumEpochRequestData.PartitionData) ((EndQuorumEpochRequestData.TopicData) outbound.data().topics().get(0)).partitions().get(0);
                Assertions.assertEquals(i, partitionData.leaderEpoch());
                Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
                Assertions.assertEquals(0, partitionData.replicaId());
                RaftRequest.Outbound outbound2 = outbound;
                hashSet.add(Integer.valueOf(outbound2.destinationId()));
                arrayList.add(outbound2);
            }
        }
        Assertions.assertEquals(set, hashSet);
        return arrayList;
    }

    private int assertSentVoteRequest(int i, int i2, long j) {
        List<RaftRequest.Outbound> collectVoteRequests = collectVoteRequests(i, i2, j);
        Assertions.assertEquals(1, collectVoteRequests.size());
        return collectVoteRequests.iterator().next().correlationId();
    }

    private List<RaftRequest.Outbound> collectVoteRequests(int i, int i2, long j) {
        ArrayList arrayList = new ArrayList();
        Iterator<RaftMessage> it = this.channel.drainSendQueue().iterator();
        while (it.hasNext()) {
            RaftRequest.Outbound outbound = (RaftMessage) it.next();
            if (outbound.data() instanceof VoteRequestData) {
                VoteRequestData data = outbound.data();
                Assertions.assertTrue(RaftUtil.hasValidTopicPartition(data, METADATA_PARTITION));
                VoteRequestData.PartitionData partitionData = (VoteRequestData.PartitionData) ((VoteRequestData.TopicData) data.topics().get(0)).partitions().get(0);
                Assertions.assertEquals(i, partitionData.candidateEpoch());
                Assertions.assertEquals(0, partitionData.candidateId());
                Assertions.assertEquals(i2, partitionData.lastOffsetEpoch());
                Assertions.assertEquals(j, partitionData.lastOffset());
                arrayList.add(outbound);
            }
        }
        return arrayList;
    }

    private int assertSentBeginQuorumEpochRequest(int i) {
        List<RaftRequest.Outbound> collectBeginEpochRequests = collectBeginEpochRequests(i);
        Assertions.assertEquals(1, collectBeginEpochRequests.size());
        return collectBeginEpochRequests.get(0).correlationId;
    }

    private List<RaftRequest.Outbound> collectBeginEpochRequests(int i) {
        ArrayList arrayList = new ArrayList();
        for (RaftRequest.Outbound outbound : this.channel.drainSentRequests(ApiKeys.BEGIN_QUORUM_EPOCH)) {
            Assertions.assertTrue(outbound.data() instanceof BeginQuorumEpochRequestData);
            BeginQuorumEpochRequestData.PartitionData partitionData = (BeginQuorumEpochRequestData.PartitionData) ((BeginQuorumEpochRequestData.TopicData) outbound.data().topics().get(0)).partitions().get(0);
            Assertions.assertEquals(i, partitionData.leaderEpoch());
            Assertions.assertEquals(0, partitionData.leaderId());
            arrayList.add(outbound);
        }
        return arrayList;
    }

    private RaftRequest.Outbound assertSentFetchRequest() {
        List<RaftRequest.Outbound> drainSentRequests = this.channel.drainSentRequests(ApiKeys.FETCH);
        Assertions.assertEquals(1, drainSentRequests.size());
        return drainSentRequests.get(0);
    }

    private void assertFetchRequestData(RaftMessage raftMessage, int i, long j, int i2) {
        Assertions.assertTrue(raftMessage.data() instanceof FetchRequestData, "Unexpected request type " + raftMessage.data());
        FetchRequestData data = raftMessage.data();
        Assertions.assertEquals(1, data.topics().size());
        Assertions.assertEquals(METADATA_PARTITION.topic(), ((FetchRequestData.FetchTopic) data.topics().get(0)).topic());
        Assertions.assertEquals(1, ((FetchRequestData.FetchTopic) data.topics().get(0)).partitions().size());
        FetchRequestData.FetchPartition fetchPartition = (FetchRequestData.FetchPartition) ((FetchRequestData.FetchTopic) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(i, fetchPartition.currentLeaderEpoch());
        Assertions.assertEquals(j, fetchPartition.fetchOffset());
        Assertions.assertEquals(i2, fetchPartition.lastFetchedEpoch());
        Assertions.assertEquals(0, data.replicaId());
    }

    private int assertSentFetchRequest(int i, long j, int i2) {
        List<RaftMessage> drainSendQueue = this.channel.drainSendQueue();
        Assertions.assertEquals(1, drainSendQueue.size());
        RaftMessage raftMessage = drainSendQueue.get(0);
        assertFetchRequestData(raftMessage, i, j, i2);
        return raftMessage.correlationId();
    }

    private FetchResponseData fetchResponse(int i, int i2, Records records, long j, Errors errors) {
        return RaftUtil.singletonFetchResponse(METADATA_PARTITION, Errors.NONE, fetchablePartitionResponse -> {
            fetchablePartitionResponse.setRecordSet(records).setErrorCode(errors.code()).setHighWatermark(j);
            fetchablePartitionResponse.currentLeader().setLeaderEpoch(i).setLeaderId(i2);
        });
    }

    private FetchResponseData outOfRangeFetchRecordsResponse(int i, int i2, long j, int i3, long j2) {
        return RaftUtil.singletonFetchResponse(METADATA_PARTITION, Errors.NONE, fetchablePartitionResponse -> {
            fetchablePartitionResponse.setErrorCode(Errors.NONE.code()).setHighWatermark(j2);
            fetchablePartitionResponse.currentLeader().setLeaderEpoch(i).setLeaderId(i2);
            fetchablePartitionResponse.divergingEpoch().setEpoch(i3).setEndOffset(j);
        });
    }

    private VoteResponseData voteResponse(boolean z, Optional<Integer> optional, int i) {
        return VoteResponse.singletonResponse(Errors.NONE, METADATA_PARTITION, Errors.NONE, i, optional.orElse(-1).intValue(), z);
    }

    private VoteRequestData voteRequest(int i, int i2, int i3, long j) {
        return VoteRequest.singletonRequest(METADATA_PARTITION, i, i2, i3, j);
    }

    private BeginQuorumEpochRequestData beginEpochRequest(int i, int i2) {
        return BeginQuorumEpochRequest.singletonRequest(METADATA_PARTITION, i, i2);
    }

    private EndQuorumEpochRequestData endEpochRequest(int i, OptionalInt optionalInt, int i2, List<Integer> list) {
        return EndQuorumEpochRequest.singletonRequest(METADATA_PARTITION, i2, i, optionalInt.orElse(-1), list);
    }

    private BeginQuorumEpochResponseData beginEpochResponse(int i, int i2) {
        return BeginQuorumEpochResponse.singletonResponse(Errors.NONE, METADATA_PARTITION, Errors.NONE, i, i2);
    }

    private int assertSentDescribeQuorumResponse(int i, int i2, long j, List<DescribeQuorumResponseData.ReplicaState> list, List<DescribeQuorumResponseData.ReplicaState> list2) {
        List<RaftMessage> drainSendQueue = this.channel.drainSendQueue();
        Assertions.assertEquals(1, drainSendQueue.size());
        RaftMessage raftMessage = drainSendQueue.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof DescribeQuorumResponseData, "Unexpected request type " + raftMessage.data());
        Assertions.assertEquals(DescribeQuorumResponse.singletonResponse(METADATA_PARTITION, i, i2, j, list, list2), raftMessage.data());
        return raftMessage.correlationId();
    }

    private FetchRequestData fetchRequest(int i, int i2, long j, int i3, int i4) {
        return RaftUtil.singletonFetchRequest(METADATA_PARTITION, fetchPartition -> {
            fetchPartition.setCurrentLeaderEpoch(i).setLastFetchedEpoch(i3).setFetchOffset(j);
        }).setMaxWaitMs(i4).setReplicaId(i2);
    }

    private void pollUntilSend(KafkaRaftClient kafkaRaftClient) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            kafkaRaftClient.poll();
            return this.channel.hasSentMessages();
        }, 5000L, "Condition failed to be satisfied before timeout");
    }

    private void deliverRequest(ApiMessage apiMessage) {
        this.channel.mockReceive(new RaftRequest.Inbound(this.channel.newCorrelationId(), apiMessage, this.time.milliseconds()));
    }

    private void deliverResponse(int i, int i2, ApiMessage apiMessage) {
        this.channel.mockReceive(new RaftResponse.Inbound(i, apiMessage, i2));
    }
}
