package org.apache.kafka.raft;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:org/apache/kafka/raft/RaftClientTestContext.class */
public final class RaftClientTestContext {
    public final RecordSerde<String> serde;
    final TopicPartition metadataPartition;
    final Uuid metadataTopicId;
    final int electionBackoffMaxMs = 100;
    final int fetchMaxWaitMs = 0;
    final int fetchTimeoutMs = 50000;
    final int retryBackoffMs = 50;
    private int electionTimeoutMs;
    private int requestTimeoutMs;
    private int appendLingerMs;
    private final QuorumStateStore quorumStateStore;
    final Uuid clusterId;
    private final OptionalInt localId;
    public final KafkaRaftClient<String> client;
    final Metrics metrics;
    public final MockLog log;
    final MockNetworkChannel channel;
    final MockMessageQueue messageQueue;
    final MockTime time;
    final MockListener listener;
    final Set<Integer> voters;
    private final List<RaftResponse.Outbound> sentResponses;

    /* loaded from: input_file:org/apache/kafka/raft/RaftClientTestContext$Builder.class */
    public static final class Builder {
        static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000;
        private static final RecordSerde<String> SERDE = new StringSerde();
        private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
        private static final int ELECTION_BACKOFF_MAX_MS = 100;
        private static final int FETCH_MAX_WAIT_MS = 0;
        private static final int FETCH_TIMEOUT_MS = 50000;
        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
        private static final int RETRY_BACKOFF_MS = 50;
        private static final int DEFAULT_APPEND_LINGER_MS = 0;
        private final MockMessageQueue messageQueue;
        private final MockTime time;
        private final QuorumStateStore quorumStateStore;
        private final MockableRandom random;
        private final LogContext logContext;
        private final MockLog log;
        private final Set<Integer> voters;
        private final OptionalInt localId;
        private Uuid clusterId;
        private int requestTimeoutMs;
        private int electionTimeoutMs;
        private int appendLingerMs;
        private MemoryPool memoryPool;

        public Builder(int i, Set<Integer> set) {
            this(OptionalInt.of(i), set);
        }

        public Builder(OptionalInt optionalInt, Set<Integer> set) {
            this.messageQueue = new MockMessageQueue();
            this.time = new MockTime();
            this.quorumStateStore = new MockQuorumStateStore();
            this.random = new MockableRandom(1L);
            this.logContext = new LogContext();
            this.log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, this.logContext);
            this.clusterId = Uuid.randomUuid();
            this.requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
            this.electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
            this.appendLingerMs = 0;
            this.memoryPool = MemoryPool.NONE;
            this.voters = set;
            this.localId = optionalInt;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withElectedLeader(int i, int i2) throws IOException {
            this.quorumStateStore.writeElectionState(ElectionState.withElectedLeader(i, i2, this.voters));
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withUnknownLeader(int i) throws IOException {
            this.quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(i, this.voters));
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withVotedCandidate(int i, int i2) throws IOException {
            this.quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(i, i2, this.voters));
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder updateRandom(Consumer<MockableRandom> consumer) {
            consumer.accept(this.random);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withMemoryPool(MemoryPool memoryPool) {
            this.memoryPool = memoryPool;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withAppendLingerMs(int i) {
            this.appendLingerMs = i;
            return this;
        }

        public Builder appendToLog(int i, List<String> list) {
            this.log.appendAsLeader(RaftClientTestContext.buildBatch(this.time.milliseconds(), this.log.endOffset().offset, i, list), i);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withEmptySnapshot(OffsetAndEpoch offsetAndEpoch) throws IOException {
            RawSnapshotWriter rawSnapshotWriter = this.log.storeSnapshot(offsetAndEpoch).get();
            Throwable th = null;
            try {
                try {
                    rawSnapshotWriter.freeze();
                    if (rawSnapshotWriter != null) {
                        if (0 != 0) {
                            try {
                                rawSnapshotWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            rawSnapshotWriter.close();
                        }
                    }
                    return this;
                } finally {
                }
            } catch (Throwable th3) {
                if (rawSnapshotWriter != null) {
                    if (th != null) {
                        try {
                            rawSnapshotWriter.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        rawSnapshotWriter.close();
                    }
                }
                throw th3;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder deleteBeforeSnapshot(OffsetAndEpoch offsetAndEpoch) throws IOException {
            if (offsetAndEpoch.offset > this.log.highWatermark().offset) {
                this.log.updateHighWatermark(new LogOffsetMetadata(offsetAndEpoch.offset));
            }
            this.log.deleteBeforeSnapshot(offsetAndEpoch);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withElectionTimeoutMs(int i) {
            this.electionTimeoutMs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withRequestTimeoutMs(int i) {
            this.requestTimeoutMs = i;
            return this;
        }

        Builder withClusterId(Uuid uuid) {
            this.clusterId = uuid;
            return this;
        }

        public RaftClientTestContext build() throws IOException {
            Metrics metrics = new Metrics(this.time);
            MockNetworkChannel mockNetworkChannel = new MockNetworkChannel(this.voters);
            MockListener mockListener = new MockListener(this.localId);
            KafkaRaftClient kafkaRaftClient = new KafkaRaftClient(SERDE, mockNetworkChannel, this.messageQueue, this.log, this.quorumStateStore, this.memoryPool, this.time, metrics, new MockExpirationService(this.time), 0, this.clusterId.toString(), this.localId, this.logContext, this.random, new RaftConfig((Map) this.voters.stream().collect(Collectors.toMap(num -> {
                return num;
            }, i -> {
                return RaftClientTestContext.mockAddress(i);
            })), this.requestTimeoutMs, RETRY_BACKOFF_MS, this.electionTimeoutMs, ELECTION_BACKOFF_MAX_MS, FETCH_TIMEOUT_MS, this.appendLingerMs));
            kafkaRaftClient.register(mockListener);
            kafkaRaftClient.initialize();
            RaftClientTestContext raftClientTestContext = new RaftClientTestContext(this.clusterId, this.localId, kafkaRaftClient, this.log, mockNetworkChannel, this.messageQueue, this.time, this.quorumStateStore, this.voters, metrics, mockListener);
            raftClientTestContext.electionTimeoutMs = this.electionTimeoutMs;
            raftClientTestContext.requestTimeoutMs = this.requestTimeoutMs;
            raftClientTestContext.appendLingerMs = this.appendLingerMs;
            return raftClientTestContext;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftClientTestContext$MockListener.class */
    public static class MockListener implements RaftClient.Listener<String> {
        private final OptionalInt localId;
        private final List<Batch<String>> commits = new ArrayList();
        private final List<BatchReader<String>> savedBatches = new ArrayList();
        private final Map<Integer, Long> claimedEpochStartOffsets = new HashMap();
        private LeaderAndEpoch currentLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
        private Optional<SnapshotReader<String>> snapshot = Optional.empty();
        private boolean readCommit = true;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MockListener(OptionalInt optionalInt) {
            this.localId = optionalInt;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int numCommittedBatches() {
            return this.commits.size();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Long claimedEpochStartOffset(int i) {
            return this.claimedEpochStartOffsets.get(Integer.valueOf(i));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public LeaderAndEpoch currentLeaderAndEpoch() {
            return this.currentLeaderAndEpoch;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Batch<String> lastCommit() {
            if (this.commits.isEmpty()) {
                return null;
            }
            return this.commits.get(this.commits.size() - 1);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public OptionalLong lastCommitOffset() {
            return this.commits.isEmpty() ? OptionalLong.empty() : OptionalLong.of(this.commits.get(this.commits.size() - 1).lastOffset());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public OptionalInt currentClaimedEpoch() {
            return (this.localId.isPresent() && this.currentLeaderAndEpoch.isLeader(this.localId.getAsInt())) ? OptionalInt.of(this.currentLeaderAndEpoch.epoch()) : OptionalInt.empty();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public List<String> commitWithBaseOffset(long j) {
            return (List) this.commits.stream().filter(batch -> {
                return batch.baseOffset() == j;
            }).findFirst().map(batch2 -> {
                return batch2.records();
            }).orElse(null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public List<String> commitWithLastOffset(long j) {
            return (List) this.commits.stream().filter(batch -> {
                return batch.lastOffset() == j;
            }).findFirst().map(batch2 -> {
                return batch2.records();
            }).orElse(null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Optional<SnapshotReader<String>> drainHandledSnapshot() {
            Optional<SnapshotReader<String>> optional = this.snapshot;
            this.snapshot = Optional.empty();
            return optional;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void updateReadCommit(boolean z) {
            this.readCommit = z;
            if (z) {
                Iterator<BatchReader<String>> it = this.savedBatches.iterator();
                while (it.hasNext()) {
                    readBatch(it.next());
                }
                this.savedBatches.clear();
            }
        }

        void readBatch(BatchReader<String> batchReader) {
            while (batchReader.hasNext()) {
                try {
                    long asLong = lastCommitOffset().isPresent() ? lastCommitOffset().getAsLong() + 1 : 0L;
                    Batch<String> batch = (Batch) batchReader.next();
                    Assertions.assertTrue(batch.baseOffset() >= asLong, "Received non-monotonic commit " + batch + ". We expected an offset at least as large as " + asLong);
                    this.commits.add(batch);
                } finally {
                    batchReader.close();
                }
            }
        }

        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            this.currentLeaderAndEpoch = leaderAndEpoch;
            currentClaimedEpoch().ifPresent(i -> {
                this.claimedEpochStartOffsets.put(Integer.valueOf(leaderAndEpoch.epoch()), Long.valueOf(lastCommitOffset().isPresent() ? lastCommitOffset().getAsLong() + 1 : 0L));
            });
        }

        public void handleCommit(BatchReader<String> batchReader) {
            if (this.readCommit) {
                readBatch(batchReader);
            } else {
                this.savedBatches.add(batchReader);
            }
        }

        public void handleSnapshot(SnapshotReader<String> snapshotReader) {
            this.snapshot.ifPresent(snapshotReader2 -> {
                snapshotReader2.getClass();
                Assertions.assertDoesNotThrow(snapshotReader2::close);
            });
            this.commits.clear();
            this.savedBatches.clear();
            this.snapshot = Optional.of(snapshotReader);
        }
    }

    private RaftClientTestContext(Uuid uuid, OptionalInt optionalInt, KafkaRaftClient<String> kafkaRaftClient, MockLog mockLog, MockNetworkChannel mockNetworkChannel, MockMessageQueue mockMessageQueue, MockTime mockTime, QuorumStateStore quorumStateStore, Set<Integer> set, Metrics metrics, MockListener mockListener) {
        this.serde = Builder.SERDE;
        this.metadataPartition = Builder.METADATA_PARTITION;
        this.metadataTopicId = Uuid.METADATA_TOPIC_ID;
        this.electionBackoffMaxMs = 100;
        this.fetchMaxWaitMs = 0;
        this.fetchTimeoutMs = 50000;
        this.retryBackoffMs = 50;
        this.sentResponses = new ArrayList();
        this.clusterId = uuid;
        this.localId = optionalInt;
        this.client = kafkaRaftClient;
        this.log = mockLog;
        this.channel = mockNetworkChannel;
        this.messageQueue = mockMessageQueue;
        this.time = mockTime;
        this.quorumStateStore = quorumStateStore;
        this.voters = set;
        this.metrics = metrics;
        this.listener = mockListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int electionTimeoutMs() {
        return this.electionTimeoutMs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int requestTimeoutMs() {
        return this.requestTimeoutMs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int appendLingerMs() {
        return this.appendLingerMs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MemoryRecords buildBatch(long j, int i, List<String> list) {
        return buildBatch(this.time.milliseconds(), j, i, list);
    }

    static MemoryRecords buildBatch(long j, long j2, int i, List<String> list) {
        BatchBuilder batchBuilder = new BatchBuilder(ByteBuffer.allocate(512), Builder.SERDE, CompressionType.NONE, j2, j, false, i, 512);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            batchBuilder.appendRecord(it.next(), (ObjectSerializationCache) null);
        }
        return batchBuilder.build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static RaftClientTestContext initializeAsLeader(int i, Set<Integer> set, int i2) throws Exception {
        if (i2 <= 0) {
            throw new IllegalArgumentException("Cannot become leader in epoch " + i2);
        }
        RaftClientTestContext build = new Builder(i, set).withUnknownLeader(i2 - 1).build();
        build.assertUnknownLeader(i2 - 1);
        build.becomeLeader();
        return build;
    }

    public void becomeLeader() throws Exception {
        int currentEpoch = currentEpoch();
        this.time.sleep(this.electionTimeoutMs * 2);
        expectAndGrantVotes(currentEpoch + 1);
        expectBeginEpoch(currentEpoch + 1);
    }

    public OptionalInt currentLeader() {
        return currentLeaderAndEpoch().leaderId();
    }

    public int currentEpoch() {
        return currentLeaderAndEpoch().epoch();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LeaderAndEpoch currentLeaderAndEpoch() {
        ElectionState readElectionState = this.quorumStateStore.readElectionState();
        return new LeaderAndEpoch(readElectionState.leaderIdOpt, readElectionState.epoch);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void expectAndGrantVotes(int i) throws Exception {
        pollUntilRequest();
        for (RaftRequest.Outbound outbound : collectVoteRequests(i, this.log.lastFetchedEpoch(), this.log.endOffset().offset)) {
            deliverResponse(outbound.correlationId, outbound.destinationId(), voteResponse(true, Optional.empty(), i));
        }
        this.client.poll();
        assertElectedLeader(i, localIdOrThrow());
    }

    private int localIdOrThrow() {
        return this.localId.orElseThrow(() -> {
            return new AssertionError("Required local id is not defined");
        });
    }

    private void expectBeginEpoch(int i) throws Exception {
        pollUntilRequest();
        for (RaftRequest.Outbound outbound : collectBeginEpochRequests(i)) {
            deliverResponse(outbound.correlationId, outbound.destinationId(), beginEpochResponse(i, localIdOrThrow()));
        }
        this.client.poll();
    }

    public void pollUntil(TestCondition testCondition) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            this.client.poll();
            return testCondition.conditionMet();
        }, 5000L, "Condition failed to be satisfied before timeout");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pollUntilResponse() throws InterruptedException {
        pollUntil(() -> {
            return !this.sentResponses.isEmpty();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pollUntilRequest() throws InterruptedException {
        MockNetworkChannel mockNetworkChannel = this.channel;
        mockNetworkChannel.getClass();
        pollUntil(mockNetworkChannel::hasSentRequests);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertVotedCandidate(int i, int i2) throws IOException {
        Assertions.assertEquals(ElectionState.withVotedCandidate(i, i2, this.voters), this.quorumStateStore.readElectionState());
    }

    public void assertElectedLeader(int i, int i2) throws IOException {
        Assertions.assertEquals(ElectionState.withElectedLeader(i, i2, this.voters), this.quorumStateStore.readElectionState());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertUnknownLeader(int i) throws IOException {
        Assertions.assertEquals(ElectionState.withUnknownLeader(i, this.voters), this.quorumStateStore.readElectionState());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertResignedLeader(int i, int i2) throws IOException {
        Assertions.assertTrue(this.client.quorum().isResigned());
        Assertions.assertEquals(ElectionState.withElectedLeader(i, i2, this.voters), this.quorumStateStore.readElectionState());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int assertSentDescribeQuorumResponse(int i, int i2, long j, List<DescribeQuorumResponseData.ReplicaState> list, List<DescribeQuorumResponseData.ReplicaState> list2) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.DESCRIBE_QUORUM);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftResponse.Outbound outbound = drainSentResponses.get(0);
        Assertions.assertTrue(outbound.data() instanceof DescribeQuorumResponseData, "Unexpected request type " + outbound.data());
        Assertions.assertEquals(DescribeQuorumResponse.singletonResponse(this.metadataPartition, i, i2, j, list, list2), outbound.data());
        return outbound.correlationId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int assertSentVoteRequest(int i, int i2, long j, int i3) {
        List<RaftRequest.Outbound> collectVoteRequests = collectVoteRequests(i, i2, j);
        Assertions.assertEquals(i3, collectVoteRequests.size());
        return collectVoteRequests.iterator().next().correlationId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentVoteResponse(Errors errors) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.VOTE);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof VoteResponseData);
        Assertions.assertEquals(errors, Errors.forCode(raftMessage.data().errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentVoteResponse(Errors errors, int i, OptionalInt optionalInt, boolean z) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.VOTE);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof VoteResponseData);
        VoteResponseData data = raftMessage.data();
        Assertions.assertTrue(RaftUtil.hasValidTopicPartition(data, this.metadataPartition));
        VoteResponseData.PartitionData partitionData = (VoteResponseData.PartitionData) ((VoteResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(partitionData.voteGranted()));
        Assertions.assertEquals(errors, Errors.forCode(partitionData.errorCode()));
        Assertions.assertEquals(i, partitionData.leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<RaftRequest.Outbound> collectVoteRequests(int i, int i2, long j) {
        ArrayList arrayList = new ArrayList();
        for (RaftRequest.Outbound outbound : this.channel.drainSendQueue()) {
            if (outbound.data() instanceof VoteRequestData) {
                VoteRequestData.PartitionData unwrap = unwrap((VoteRequestData) outbound.data());
                Assertions.assertEquals(i, unwrap.candidateEpoch());
                Assertions.assertEquals(localIdOrThrow(), unwrap.candidateId());
                Assertions.assertEquals(i2, unwrap.lastOffsetEpoch());
                Assertions.assertEquals(j, unwrap.lastOffset());
                arrayList.add(outbound);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deliverRequest(ApiMessage apiMessage) {
        RaftRequest.Inbound inbound = new RaftRequest.Inbound(this.channel.newCorrelationId(), apiMessage, this.time.milliseconds());
        inbound.completion.whenComplete((outbound, th) -> {
            if (th != null) {
                throw new RuntimeException(th);
            }
            this.sentResponses.add(outbound);
        });
        this.client.handle(inbound);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deliverResponse(int i, int i2, ApiMessage apiMessage) {
        this.channel.mockReceive(new RaftResponse.Inbound(i, apiMessage, i2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int assertSentBeginQuorumEpochRequest(int i, int i2) {
        List<RaftRequest.Outbound> collectBeginEpochRequests = collectBeginEpochRequests(i);
        Assertions.assertEquals(i2, collectBeginEpochRequests.size());
        return collectBeginEpochRequests.get(0).correlationId;
    }

    private List<RaftResponse.Outbound> drainSentResponses(ApiKeys apiKeys) {
        ArrayList arrayList = new ArrayList();
        Iterator<RaftResponse.Outbound> it = this.sentResponses.iterator();
        while (it.hasNext()) {
            RaftResponse.Outbound next = it.next();
            if (next.data.apiKey() == apiKeys.id) {
                arrayList.add(next);
                it.remove();
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentBeginQuorumEpochResponse(Errors errors) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof BeginQuorumEpochResponseData);
        Assertions.assertEquals(errors, Errors.forCode(raftMessage.data().errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentBeginQuorumEpochResponse(Errors errors, int i, OptionalInt optionalInt) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof BeginQuorumEpochResponseData);
        BeginQuorumEpochResponseData data = raftMessage.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        BeginQuorumEpochResponseData.PartitionData partitionData = (BeginQuorumEpochResponseData.PartitionData) ((BeginQuorumEpochResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(i, partitionData.leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
        Assertions.assertEquals(errors, Errors.forCode(partitionData.errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int assertSentEndQuorumEpochRequest(int i, int i2) {
        List<RaftRequest.Outbound> collectEndQuorumRequests = collectEndQuorumRequests(i, Collections.singleton(Integer.valueOf(i2)), Optional.empty());
        Assertions.assertEquals(1, collectEndQuorumRequests.size());
        return collectEndQuorumRequests.get(0).correlationId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentEndQuorumEpochResponse(Errors errors) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.END_QUORUM_EPOCH);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof EndQuorumEpochResponseData);
        Assertions.assertEquals(errors, Errors.forCode(raftMessage.data().errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentEndQuorumEpochResponse(Errors errors, int i, OptionalInt optionalInt) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.END_QUORUM_EPOCH);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof EndQuorumEpochResponseData);
        EndQuorumEpochResponseData data = raftMessage.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        EndQuorumEpochResponseData.PartitionData partitionData = (EndQuorumEpochResponseData.PartitionData) ((EndQuorumEpochResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(i, partitionData.leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), partitionData.leaderId());
        Assertions.assertEquals(errors, Errors.forCode(partitionData.errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RaftRequest.Outbound assertSentFetchRequest() {
        List<RaftRequest.Outbound> drainSentRequests = this.channel.drainSentRequests(Optional.of(ApiKeys.FETCH));
        Assertions.assertEquals(1, drainSentRequests.size());
        return drainSentRequests.get(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int assertSentFetchRequest(int i, long j, int i2) {
        List<RaftRequest.Outbound> drainSendQueue = this.channel.drainSendQueue();
        Assertions.assertEquals(1, drainSendQueue.size());
        RaftMessage raftMessage = (RaftMessage) drainSendQueue.get(0);
        assertFetchRequestData(raftMessage, i, j, i2);
        return raftMessage.correlationId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.FETCH);
        Assertions.assertEquals(1, drainSentResponses.size(), "Found unexpected sent messages " + drainSentResponses);
        RaftResponse.Outbound outbound = drainSentResponses.get(0);
        Assertions.assertEquals(ApiKeys.FETCH.id, outbound.data.apiKey());
        FetchResponseData data = outbound.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        Assertions.assertEquals(1, data.responses().size());
        Assertions.assertEquals(this.metadataPartition.topic(), ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).topic());
        Assertions.assertEquals(1, ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).partitions().size());
        return (FetchResponseData.PartitionData) ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).partitions().get(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentFetchPartitionResponse(Errors errors) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.FETCH);
        Assertions.assertEquals(1, drainSentResponses.size(), "Found unexpected sent messages " + drainSentResponses);
        RaftResponse.Outbound outbound = drainSentResponses.get(0);
        Assertions.assertEquals(ApiKeys.FETCH.id, outbound.data.apiKey());
        Assertions.assertEquals(errors, Errors.forCode(outbound.data().errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MemoryRecords assertSentFetchPartitionResponse(Errors errors, int i, OptionalInt optionalInt) {
        FetchResponseData.PartitionData assertSentFetchPartitionResponse = assertSentFetchPartitionResponse();
        Assertions.assertEquals(errors, Errors.forCode(assertSentFetchPartitionResponse.errorCode()));
        Assertions.assertEquals(i, assertSentFetchPartitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals(optionalInt.orElse(-1), assertSentFetchPartitionResponse.currentLeader().leaderId());
        Assertions.assertEquals(-1L, assertSentFetchPartitionResponse.divergingEpoch().endOffset());
        Assertions.assertEquals(-1, assertSentFetchPartitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals(-1L, assertSentFetchPartitionResponse.snapshotId().endOffset());
        Assertions.assertEquals(-1, assertSentFetchPartitionResponse.snapshotId().epoch());
        return assertSentFetchPartitionResponse.records();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MemoryRecords assertSentFetchPartitionResponse(long j, int i) {
        FetchResponseData.PartitionData assertSentFetchPartitionResponse = assertSentFetchPartitionResponse();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(assertSentFetchPartitionResponse.errorCode()));
        Assertions.assertEquals(i, assertSentFetchPartitionResponse.currentLeader().leaderEpoch());
        Assertions.assertEquals(j, assertSentFetchPartitionResponse.highWatermark());
        Assertions.assertEquals(-1L, assertSentFetchPartitionResponse.divergingEpoch().endOffset());
        Assertions.assertEquals(-1, assertSentFetchPartitionResponse.divergingEpoch().epoch());
        Assertions.assertEquals(-1L, assertSentFetchPartitionResponse.snapshotId().endOffset());
        Assertions.assertEquals(-1, assertSentFetchPartitionResponse.snapshotId().epoch());
        return assertSentFetchPartitionResponse.records();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RaftRequest.Outbound assertSentFetchSnapshotRequest() {
        List<RaftRequest.Outbound> drainSentRequests = this.channel.drainSentRequests(Optional.of(ApiKeys.FETCH_SNAPSHOT));
        Assertions.assertEquals(1, drainSentRequests.size());
        return drainSentRequests.get(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertSentFetchSnapshotResponse(Errors errors) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof FetchSnapshotResponseData);
        Assertions.assertEquals(errors, Errors.forCode(raftMessage.data().errorCode()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<FetchSnapshotResponseData.PartitionSnapshot> assertSentFetchSnapshotResponse(TopicPartition topicPartition) {
        List<RaftResponse.Outbound> drainSentResponses = drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
        Assertions.assertEquals(1, drainSentResponses.size());
        RaftMessage raftMessage = drainSentResponses.get(0);
        Assertions.assertTrue(raftMessage.data() instanceof FetchSnapshotResponseData);
        FetchSnapshotResponseData data = raftMessage.data();
        Assertions.assertEquals(Errors.NONE, Errors.forCode(data.errorCode()));
        return FetchSnapshotResponse.forTopicPartition(data, topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<RaftRequest.Outbound> collectEndQuorumRequests(int i, Set<Integer> set, Optional<List<Integer>> optional) {
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (RaftRequest.Outbound outbound : this.channel.drainSendQueue()) {
            if (outbound.data() instanceof EndQuorumEpochRequestData) {
                EndQuorumEpochRequestData.PartitionData partitionData = (EndQuorumEpochRequestData.PartitionData) ((EndQuorumEpochRequestData.TopicData) outbound.data().topics().get(0)).partitions().get(0);
                Assertions.assertEquals(i, partitionData.leaderEpoch());
                Assertions.assertEquals(localIdOrThrow(), partitionData.leaderId());
                optional.ifPresent(list -> {
                    Assertions.assertEquals(list, partitionData.preferredSuccessors());
                });
                RaftRequest.Outbound outbound2 = outbound;
                hashSet.add(Integer.valueOf(outbound2.destinationId()));
                arrayList.add(outbound2);
            }
        }
        Assertions.assertEquals(set, hashSet);
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void discoverLeaderAsObserver(int i, int i2) throws Exception {
        pollUntilRequest();
        RaftRequest.Outbound assertSentFetchRequest = assertSentFetchRequest();
        Assertions.assertTrue(this.voters.contains(Integer.valueOf(assertSentFetchRequest.destinationId())));
        assertFetchRequestData(assertSentFetchRequest, 0, 0L, 0);
        deliverResponse(assertSentFetchRequest.correlationId, assertSentFetchRequest.destinationId(), fetchResponse(i2, i, MemoryRecords.EMPTY, 0L, Errors.NONE));
        this.client.poll();
        assertElectedLeader(i2, i);
    }

    private List<RaftRequest.Outbound> collectBeginEpochRequests(int i) {
        ArrayList arrayList = new ArrayList();
        for (RaftRequest.Outbound outbound : this.channel.drainSentRequests(Optional.of(ApiKeys.BEGIN_QUORUM_EPOCH))) {
            Assertions.assertTrue(outbound.data() instanceof BeginQuorumEpochRequestData);
            BeginQuorumEpochRequestData.PartitionData partitionData = (BeginQuorumEpochRequestData.PartitionData) ((BeginQuorumEpochRequestData.TopicData) outbound.data().topics().get(0)).partitions().get(0);
            Assertions.assertEquals(i, partitionData.leaderEpoch());
            Assertions.assertEquals(localIdOrThrow(), partitionData.leaderId());
            arrayList.add(outbound);
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RaftConfig.AddressSpec mockAddress(int i) {
        return new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EndQuorumEpochResponseData endEpochResponse(int i, OptionalInt optionalInt) {
        return EndQuorumEpochResponse.singletonResponse(Errors.NONE, this.metadataPartition, Errors.NONE, i, optionalInt.orElse(-1));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EndQuorumEpochRequestData endEpochRequest(int i, int i2, List<Integer> list) {
        return EndQuorumEpochRequest.singletonRequest(this.metadataPartition, i, i2, list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EndQuorumEpochRequestData endEpochRequest(String str, int i, int i2, List<Integer> list) {
        return EndQuorumEpochRequest.singletonRequest(this.metadataPartition, str, i, i2, list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BeginQuorumEpochRequestData beginEpochRequest(String str, int i, int i2) {
        return BeginQuorumEpochRequest.singletonRequest(this.metadataPartition, str, i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BeginQuorumEpochRequestData beginEpochRequest(int i, int i2) {
        return BeginQuorumEpochRequest.singletonRequest(this.metadataPartition, i, i2);
    }

    private BeginQuorumEpochResponseData beginEpochResponse(int i, int i2) {
        return BeginQuorumEpochResponse.singletonResponse(Errors.NONE, this.metadataPartition, Errors.NONE, i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VoteRequestData voteRequest(int i, int i2, int i3, long j) {
        return VoteRequest.singletonRequest(this.metadataPartition, this.clusterId.toString(), i, i2, i3, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VoteRequestData voteRequest(String str, int i, int i2, int i3, long j) {
        return VoteRequest.singletonRequest(this.metadataPartition, str, i, i2, i3, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VoteResponseData voteResponse(boolean z, Optional<Integer> optional, int i) {
        return VoteResponse.singletonResponse(Errors.NONE, this.metadataPartition, Errors.NONE, i, optional.orElse(-1).intValue(), z);
    }

    private VoteRequestData.PartitionData unwrap(VoteRequestData voteRequestData) {
        Assertions.assertTrue(RaftUtil.hasValidTopicPartition(voteRequestData, this.metadataPartition));
        return (VoteRequestData.PartitionData) ((VoteRequestData.TopicData) voteRequestData.topics().get(0)).partitions().get(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void assertMatchingRecords(String[] strArr, Records records) {
        List list = Utils.toList(records.records());
        Assertions.assertEquals(strArr.length, list.size());
        for (int i = 0; i < strArr.length; i++) {
            Record record = (Record) list.get(i);
            Assertions.assertEquals(strArr[i], Utils.utf8(record.value()), "Record at offset " + record.offset() + " does not match expected");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void verifyLeaderChangeMessage(int i, List<Integer> list, List<Integer> list2, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Assertions.assertEquals(ControlRecordType.LEADER_CHANGE, ControlRecordType.parse(byteBuffer));
        LeaderChangeMessage deserializeLeaderChangeMessage = ControlRecordUtils.deserializeLeaderChangeMessage(byteBuffer2);
        Assertions.assertEquals(i, deserializeLeaderChangeMessage.leaderId());
        Assertions.assertEquals(list.stream().map(num -> {
            return new LeaderChangeMessage.Voter().setVoterId(num.intValue());
        }).collect(Collectors.toList()), deserializeLeaderChangeMessage.voters());
        Assertions.assertEquals(list2.stream().map(num2 -> {
            return new LeaderChangeMessage.Voter().setVoterId(num2.intValue());
        }).collect(Collectors.toSet()), new HashSet(deserializeLeaderChangeMessage.grantingVoters()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertFetchRequestData(RaftMessage raftMessage, int i, long j, int i2) {
        Assertions.assertTrue(raftMessage.data() instanceof FetchRequestData, "Unexpected request type " + raftMessage.data());
        FetchRequestData data = raftMessage.data();
        Assertions.assertEquals(8388608, data.maxBytes());
        Assertions.assertEquals(0, data.maxWaitMs());
        Assertions.assertEquals(1, data.topics().size());
        Assertions.assertEquals(this.metadataPartition.topic(), ((FetchRequestData.FetchTopic) data.topics().get(0)).topic());
        Assertions.assertEquals(1, ((FetchRequestData.FetchTopic) data.topics().get(0)).partitions().size());
        FetchRequestData.FetchPartition fetchPartition = (FetchRequestData.FetchPartition) ((FetchRequestData.FetchTopic) data.topics().get(0)).partitions().get(0);
        Assertions.assertEquals(i, fetchPartition.currentLeaderEpoch());
        Assertions.assertEquals(j, fetchPartition.fetchOffset());
        Assertions.assertEquals(i2, fetchPartition.lastFetchedEpoch());
        Assertions.assertEquals(this.localId.orElse(-1), data.replicaId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FetchRequestData fetchRequest(int i, int i2, long j, int i3, int i4) {
        return fetchRequest(i, this.clusterId.toString(), i2, j, i3, i4);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FetchRequestData fetchRequest(int i, String str, int i2, long j, int i3, int i4) {
        return RaftUtil.singletonFetchRequest(this.metadataPartition, this.metadataTopicId, fetchPartition -> {
            fetchPartition.setCurrentLeaderEpoch(i).setLastFetchedEpoch(i3).setFetchOffset(j);
        }).setMaxWaitMs(i4).setClusterId(str).setReplicaId(i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FetchResponseData fetchResponse(int i, int i2, Records records, long j, Errors errors) {
        return RaftUtil.singletonFetchResponse(this.metadataPartition, this.metadataTopicId, Errors.NONE, partitionData -> {
            partitionData.setRecords(records).setErrorCode(errors.code()).setHighWatermark(j);
            partitionData.currentLeader().setLeaderEpoch(i).setLeaderId(i2);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FetchResponseData divergingFetchResponse(int i, int i2, long j, int i3, long j2) {
        return RaftUtil.singletonFetchResponse(this.metadataPartition, this.metadataTopicId, Errors.NONE, partitionData -> {
            partitionData.setHighWatermark(j2);
            partitionData.currentLeader().setLeaderEpoch(i).setLeaderId(i2);
            partitionData.divergingEpoch().setEpoch(i3).setEndOffset(j);
        });
    }

    public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException {
        Assertions.assertEquals(this.localId, currentLeader());
        long j = this.log.endOffset().offset;
        Iterator it = ((Set) this.voters.stream().filter(num -> {
            return num.intValue() != this.localId.getAsInt();
        }).collect(Collectors.toSet())).iterator();
        while (it.hasNext()) {
            deliverRequest(fetchRequest(currentEpoch(), ((Integer) it.next()).intValue(), j, currentEpoch(), 0));
            pollUntilResponse();
            assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), this.localId);
        }
        pollUntil(() -> {
            return OptionalLong.of(j).equals(this.client.highWatermark());
        });
    }
}
