package org.apache.kafka.raft;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.jqwik.api.AfterFailureMode;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.Tag;
import net.jqwik.api.constraints.IntRange;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.MockLog;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.junit.jupiter.api.Assertions;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest.class */
public class RaftEventSimulationTest {
    private static final TopicPartition METADATA_PARTITION = new TopicPartition("__cluster_metadata", 0);
    private static final int ELECTION_TIMEOUT_MS = 1000;
    private static final int ELECTION_JITTER_MS = 100;
    private static final int FETCH_TIMEOUT_MS = 3000;
    private static final int RETRY_BACKOFF_MS = 50;
    private static final int REQUEST_TIMEOUT_MS = 3000;
    private static final int FETCH_MAX_WAIT_MS = 100;
    private static final int LINGER_MS = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$Cluster.class */
    public static class Cluster {
        final Random random;
        final AtomicInteger correlationIdCounter;
        final MockTime time;
        final Uuid clusterId;
        final Set<Integer> voters;
        final Map<Integer, PersistentState> nodes;
        final Map<Integer, RaftNode> running;

        private Cluster(int i, int i2, Random random) {
            this.correlationIdCounter = new AtomicInteger();
            this.time = new MockTime();
            this.clusterId = Uuid.randomUuid();
            this.voters = new HashSet();
            this.nodes = new HashMap();
            this.running = new HashMap();
            this.random = random;
            int i3 = 0;
            while (i3 < i) {
                this.voters.add(Integer.valueOf(i3));
                this.nodes.put(Integer.valueOf(i3), new PersistentState(i3));
                i3++;
            }
            while (i3 < i + i2) {
                this.nodes.put(Integer.valueOf(i3), new PersistentState(i3));
                i3++;
            }
        }

        Set<Integer> nodes() {
            return this.nodes.keySet();
        }

        int majoritySize() {
            return (this.voters.size() / 2) + 1;
        }

        long maxLogEndOffset() {
            return this.running.values().stream().mapToLong((v0) -> {
                return v0.logEndOffset();
            }).max().orElse(0L);
        }

        OptionalLong leaderHighWatermark() {
            Optional<RaftNode> max = this.running.values().stream().filter(raftNode -> {
                return raftNode.client.quorum().isLeader();
            }).max((raftNode2, raftNode3) -> {
                return Integer.compare(raftNode3.client.quorum().epoch(), raftNode2.client.quorum().epoch());
            });
            return max.isPresent() ? max.get().client.highWatermark() : OptionalLong.empty();
        }

        boolean anyReachedHighWatermark(long j) {
            return this.running.values().stream().anyMatch(raftNode -> {
                return raftNode.highWatermark() > j;
            });
        }

        long maxHighWatermarkReached() {
            return this.running.values().stream().mapToLong((v0) -> {
                return v0.highWatermark();
            }).max().orElse(0L);
        }

        long maxHighWatermarkReached(Set<Integer> set) {
            return this.running.values().stream().filter(raftNode -> {
                return set.contains(Integer.valueOf(raftNode.nodeId));
            }).mapToLong((v0) -> {
                return v0.highWatermark();
            }).max().orElse(0L);
        }

        boolean allReachedHighWatermark(long j, Set<Integer> set) {
            return set.stream().allMatch(num -> {
                return this.running.get(num).highWatermark() >= j;
            });
        }

        boolean allReachedHighWatermark(long j) {
            return this.running.values().stream().allMatch(raftNode -> {
                return raftNode.highWatermark() >= j;
            });
        }

        boolean hasLeader(int i) {
            OptionalInt latestLeader = latestLeader();
            return latestLeader.isPresent() && latestLeader.getAsInt() == i;
        }

        OptionalInt latestLeader() {
            OptionalInt empty = OptionalInt.empty();
            int i = 0;
            for (RaftNode raftNode : this.running.values()) {
                if (raftNode.client.quorum().epoch() > i) {
                    empty = raftNode.client.quorum().leaderId();
                    i = raftNode.client.quorum().epoch();
                } else if (raftNode.client.quorum().epoch() == i && raftNode.client.quorum().leaderId().isPresent()) {
                    empty = raftNode.client.quorum().leaderId();
                }
            }
            return empty;
        }

        boolean hasConsistentLeader() {
            Iterator<RaftNode> it = this.running.values().iterator();
            if (!it.hasNext()) {
                return false;
            }
            ElectionState readElectionState = it.next().store.readElectionState();
            if (!readElectionState.hasLeader()) {
                return false;
            }
            while (it.hasNext()) {
                if (!readElectionState.equals(it.next().store.readElectionState())) {
                    return false;
                }
            }
            return true;
        }

        void killAll() {
            this.running.clear();
        }

        void kill(int i) {
            this.running.remove(Integer.valueOf(i));
        }

        void shutdown(int i) {
            RaftNode raftNode = this.running.get(Integer.valueOf(i));
            if (raftNode == null) {
                throw new IllegalStateException("Attempt to shutdown a node which is not currently running");
            }
            raftNode.client.shutdown(500).whenComplete((r5, th) -> {
                kill(i);
            });
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void pollIfRunning(int i) {
            ifRunning(i, (v0) -> {
                v0.poll();
            });
        }

        Optional<RaftNode> nodeIfRunning(int i) {
            return Optional.ofNullable(this.running.get(Integer.valueOf(i)));
        }

        Collection<RaftNode> running() {
            return this.running.values();
        }

        void ifRunning(int i, Consumer<RaftNode> consumer) {
            nodeIfRunning(i).ifPresent(consumer);
        }

        Optional<RaftNode> randomRunning() {
            ArrayList arrayList = new ArrayList(this.running.values());
            return arrayList.isEmpty() ? Optional.empty() : Optional.of(arrayList.get(this.random.nextInt(arrayList.size())));
        }

        void withCurrentLeader(Consumer<RaftNode> consumer) {
            for (RaftNode raftNode : this.running.values()) {
                if (raftNode.client.quorum().isLeader()) {
                    consumer.accept(raftNode);
                }
            }
        }

        void forAllRunning(Consumer<RaftNode> consumer) {
            this.running.values().forEach(consumer);
        }

        void startAll() {
            if (!this.running.isEmpty()) {
                throw new IllegalStateException("Some nodes are already started");
            }
            Iterator<Integer> it = this.nodes.keySet().iterator();
            while (it.hasNext()) {
                start(it.next().intValue());
            }
        }

        void killAndDeletePersistentState(int i) {
            kill(i);
            this.nodes.put(Integer.valueOf(i), new PersistentState(i));
        }

        private static RaftConfig.AddressSpec nodeAddress(int i) {
            return new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + i));
        }

        void start(int i) {
            LogContext logContext = new LogContext("[Node " + i + "] ");
            PersistentState persistentState = this.nodes.get(Integer.valueOf(i));
            MockNetworkChannel mockNetworkChannel = new MockNetworkChannel(this.correlationIdCounter, this.voters);
            MockMessageQueue mockMessageQueue = new MockMessageQueue();
            RaftConfig raftConfig = new RaftConfig((Map) this.voters.stream().collect(Collectors.toMap(num -> {
                return num;
            }, (v0) -> {
                return nodeAddress(v0);
            })), 3000, RaftEventSimulationTest.RETRY_BACKOFF_MS, RaftEventSimulationTest.ELECTION_TIMEOUT_MS, 100, 3000, 0);
            Metrics metrics = new Metrics(this.time);
            persistentState.log.reopen();
            IntSerde intSerde = new IntSerde();
            RaftNode raftNode = new RaftNode(i, new KafkaRaftClient(intSerde, mockNetworkChannel, mockMessageQueue, persistentState.log, persistentState.store, new BatchMemoryPool(2, 8388608), this.time, metrics, new MockExpirationService(this.time), 100, this.clusterId.toString(), OptionalInt.of(i), logContext, this.random, raftConfig), persistentState.log, mockNetworkChannel, mockMessageQueue, persistentState.store, logContext, this.time, this.random, intSerde);
            raftNode.initialize();
            this.running.put(Integer.valueOf(i), raftNode);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$ConsistentCommittedData.class */
    public static class ConsistentCommittedData implements Validation {
        final Cluster cluster;
        final Map<Long, Integer> committedSequenceNumbers;

        private ConsistentCommittedData(Cluster cluster) {
            this.committedSequenceNumbers = new HashMap();
            this.cluster = cluster;
        }

        private int parseSequenceNumber(ByteBuffer byteBuffer) {
            return ((Integer) Type.INT32.read(byteBuffer)).intValue();
        }

        private void assertCommittedData(RaftNode raftNode) {
            int i = raftNode.nodeId;
            KafkaRaftClient<Integer> kafkaRaftClient = raftNode.client;
            MockLog mockLog = raftNode.log;
            OptionalLong highWatermark = kafkaRaftClient.highWatermark();
            if (highWatermark.isPresent()) {
                AtomicLong atomicLong = new AtomicLong(0L);
                mockLog.earliestSnapshotId().ifPresent(offsetAndEpoch -> {
                    Assertions.assertTrue(offsetAndEpoch.offset <= highWatermark.getAsLong());
                    atomicLong.set(offsetAndEpoch.offset);
                    RecordsSnapshotReader of = RecordsSnapshotReader.of(mockLog.readSnapshot(offsetAndEpoch).get(), raftNode.intSerde, BufferSupplier.create(), Integer.MAX_VALUE, true);
                    Throwable th = null;
                    try {
                        try {
                            Assertions.assertTrue(of.hasNext());
                            Batch batch = (Batch) of.next();
                            Assertions.assertFalse(of.hasNext());
                            Assertions.assertEquals(1, batch.records().size());
                            long j = offsetAndEpoch.offset - 1;
                            int intValue = ((Integer) batch.records().get(0)).intValue();
                            this.committedSequenceNumbers.putIfAbsent(Long.valueOf(j), Integer.valueOf(intValue));
                            Assertions.assertEquals(this.committedSequenceNumbers.get(Long.valueOf(j)), intValue, String.format("Committed sequence at offset %s changed on node %s", Long.valueOf(j), Integer.valueOf(i)));
                            if (of != null) {
                                if (0 == 0) {
                                    of.close();
                                    return;
                                }
                                try {
                                    of.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (of != null) {
                            if (th != null) {
                                try {
                                    of.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                of.close();
                            }
                        }
                        throw th4;
                    }
                });
                for (MockLog.LogBatch logBatch : mockLog.readBatches(atomicLong.get(), highWatermark)) {
                    if (!logBatch.isControlBatch) {
                        for (MockLog.LogEntry logEntry : logBatch.entries) {
                            long j = logEntry.offset;
                            Assertions.assertTrue(j < highWatermark.getAsLong());
                            int parseSequenceNumber = parseSequenceNumber(logEntry.record.value().duplicate());
                            this.committedSequenceNumbers.putIfAbsent(Long.valueOf(j), Integer.valueOf(parseSequenceNumber));
                            Assertions.assertEquals(this.committedSequenceNumbers.get(Long.valueOf(j)).intValue(), parseSequenceNumber, "Committed sequence at offset " + j + " changed on node " + i);
                        }
                    }
                }
            }
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Validation
        public void validate() {
            this.cluster.forAllRunning(this::assertCommittedData);
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$DropAllTraffic.class */
    private static class DropAllTraffic implements NetworkFilter {
        private DropAllTraffic() {
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.NetworkFilter
        public boolean acceptInbound(RaftMessage raftMessage) {
            return false;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.NetworkFilter
        public boolean acceptOutbound(RaftMessage raftMessage) {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$DropOutboundRequestsFrom.class */
    private static class DropOutboundRequestsFrom implements NetworkFilter {
        private final Set<Integer> unreachable;

        private DropOutboundRequestsFrom(Set<Integer> set) {
            this.unreachable = set;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.NetworkFilter
        public boolean acceptInbound(RaftMessage raftMessage) {
            return true;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.NetworkFilter
        public boolean acceptOutbound(RaftMessage raftMessage) {
            return ((raftMessage instanceof RaftRequest.Outbound) && this.unreachable.contains(Integer.valueOf(((RaftRequest.Outbound) raftMessage).destinationId()))) ? false : true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$Event.class */
    public static abstract class Event implements Comparable<Event> {
        final int eventId;
        final long deadlineMs;
        final Runnable action;

        protected Event(Runnable runnable, int i, long j) {
            this.action = runnable;
            this.eventId = i;
            this.deadlineMs = j;
        }

        void execute(EventScheduler eventScheduler) {
            this.action.run();
        }

        @Override // java.lang.Comparable
        public int compareTo(Event event) {
            int compare = Long.compare(this.deadlineMs, event.deadlineMs);
            return compare != 0 ? compare : Integer.compare(this.eventId, event.eventId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$EventScheduler.class */
    public static class EventScheduler {
        private static final int MAX_ITERATIONS = 500000;
        final AtomicInteger eventIdGenerator;
        final PriorityQueue<Event> queue;
        final Random random;
        final Time time;
        final List<Invariant> invariants;
        final List<Validation> validations;

        private EventScheduler(Random random, Time time) {
            this.eventIdGenerator = new AtomicInteger(0);
            this.queue = new PriorityQueue<>();
            this.invariants = new ArrayList();
            this.validations = new ArrayList();
            this.random = random;
            this.time = time;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addInvariant(Invariant invariant) {
            this.invariants.add(invariant);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addValidation(Validation validation) {
            this.validations.add(validation);
        }

        void schedule(Runnable runnable, int i, int i2, int i3) {
            this.queue.offer(new PeriodicEvent(runnable, this.eventIdGenerator.incrementAndGet(), this.random, this.time.milliseconds() + i, i2, i3));
        }

        void runUntil(Supplier<Boolean> supplier) {
            for (int i = 0; i < MAX_ITERATIONS && !supplier.get().booleanValue(); i++) {
                if (this.queue.isEmpty()) {
                    throw new IllegalStateException("Event queue exhausted before condition was satisfied");
                }
                Event poll = this.queue.poll();
                this.time.sleep(Math.max(poll.deadlineMs - this.time.milliseconds(), 0L));
                poll.execute(this);
                this.invariants.forEach((v0) -> {
                    v0.verify();
                });
            }
            Assertions.assertTrue(supplier.get().booleanValue(), "Simulation condition was not satisfied after 500000 iterations");
            this.validations.forEach((v0) -> {
                v0.validate();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$InflightRequest.class */
    public static class InflightRequest {
        final int correlationId;
        final int sourceId;
        final int destinationId;

        private InflightRequest(int i, int i2, int i3) {
            this.correlationId = i;
            this.sourceId = i2;
            this.destinationId = i3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$IntSerde.class */
    public static class IntSerde implements RecordSerde<Integer> {
        private IntSerde() {
        }

        public int recordSize(Integer num, ObjectSerializationCache objectSerializationCache) {
            return Type.INT32.sizeOf(num);
        }

        public void write(Integer num, ObjectSerializationCache objectSerializationCache, Writable writable) {
            writable.writeInt(num.intValue());
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public Integer m3read(Readable readable, int i) {
            return Integer.valueOf(readable.readInt());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$Invariant.class */
    public interface Invariant {
        void verify();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$LeaderNeverLoadSnapshot.class */
    public static class LeaderNeverLoadSnapshot implements Invariant {
        final Cluster cluster;

        private LeaderNeverLoadSnapshot(Cluster cluster) {
            this.cluster = cluster;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Invariant
        public void verify() {
            for (RaftNode raftNode : this.cluster.running()) {
                if (raftNode.counter.isWritable()) {
                    Assertions.assertEquals(0, raftNode.counter.handleSnapshotCalls());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$MajorityReachedHighWatermark.class */
    public static class MajorityReachedHighWatermark implements Invariant {
        final Cluster cluster;

        private MajorityReachedHighWatermark(Cluster cluster) {
            this.cluster = cluster;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Invariant
        public void verify() {
            this.cluster.leaderHighWatermark().ifPresent(j -> {
                Assertions.assertTrue(this.cluster.nodes.entrySet().stream().filter(entry -> {
                    return this.cluster.voters.contains(entry.getKey());
                }).filter(entry2 -> {
                    return ((PersistentState) entry2.getValue()).log.endOffset().offset >= j;
                }).count() >= ((long) this.cluster.majoritySize()), "Insufficient nodes have reached current high watermark");
            });
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$MessageRouter.class */
    private static class MessageRouter {
        final Map<Integer, InflightRequest> inflight;
        final Map<Integer, NetworkFilter> filters;
        final Cluster cluster;

        private MessageRouter(Cluster cluster) {
            this.inflight = new HashMap();
            this.filters = new HashMap();
            this.cluster = cluster;
            Iterator<Integer> it = cluster.nodes.keySet().iterator();
            while (it.hasNext()) {
                this.filters.put(Integer.valueOf(it.next().intValue()), new PermitAllTraffic());
            }
        }

        void deliver(int i, RaftRequest.Outbound outbound) {
            if (this.filters.get(Integer.valueOf(i)).acceptOutbound(outbound)) {
                int correlationId = outbound.correlationId();
                int destinationId = outbound.destinationId();
                RaftMessage inbound = new RaftRequest.Inbound(correlationId, outbound.data(), this.cluster.time.milliseconds());
                if (this.filters.get(Integer.valueOf(destinationId)).acceptInbound(inbound)) {
                    this.cluster.nodeIfRunning(destinationId).ifPresent(raftNode -> {
                        this.inflight.put(Integer.valueOf(correlationId), new InflightRequest(correlationId, i, destinationId));
                        inbound.completion.whenComplete((outbound2, th) -> {
                            if (outbound2 == null || !this.filters.get(Integer.valueOf(destinationId)).acceptOutbound(outbound2)) {
                                return;
                            }
                            deliver(destinationId, outbound2);
                        });
                        raftNode.client.handle(inbound);
                    });
                }
            }
        }

        void deliver(int i, RaftResponse.Outbound outbound) {
            int correlationId = outbound.correlationId();
            RaftMessage inbound = new RaftResponse.Inbound(correlationId, outbound.data(), i);
            InflightRequest remove = this.inflight.remove(Integer.valueOf(correlationId));
            if (this.filters.get(Integer.valueOf(remove.sourceId)).acceptInbound(inbound)) {
                this.cluster.nodeIfRunning(remove.sourceId).ifPresent(raftNode -> {
                    raftNode.channel.mockReceive(inbound);
                });
            }
        }

        void filter(int i, NetworkFilter networkFilter) {
            this.filters.put(Integer.valueOf(i), networkFilter);
        }

        void deliverTo(RaftNode raftNode) {
            raftNode.channel.drainSendQueue().forEach(outbound -> {
                deliver(raftNode.nodeId, outbound);
            });
        }

        void deliverAll() {
            Iterator<RaftNode> it = this.cluster.running().iterator();
            while (it.hasNext()) {
                deliverTo(it.next());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$MonotonicEpoch.class */
    public static class MonotonicEpoch implements Invariant {
        final Cluster cluster;
        final Map<Integer, Integer> nodeEpochs;

        private MonotonicEpoch(Cluster cluster) {
            this.nodeEpochs = new HashMap();
            this.cluster = cluster;
            Iterator<Map.Entry<Integer, PersistentState>> it = cluster.nodes.entrySet().iterator();
            while (it.hasNext()) {
                this.nodeEpochs.put(it.next().getKey(), 0);
            }
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Invariant
        public void verify() {
            for (Map.Entry<Integer, PersistentState> entry : this.cluster.nodes.entrySet()) {
                Integer key = entry.getKey();
                PersistentState value = entry.getValue();
                Integer num = this.nodeEpochs.get(key);
                ElectionState readElectionState = value.store.readElectionState();
                if (readElectionState != null) {
                    Integer valueOf = Integer.valueOf(readElectionState.epoch);
                    if (num.intValue() > valueOf.intValue()) {
                        Assertions.fail("Non-monotonic update of epoch detected on node " + key + ": " + num + " -> " + valueOf);
                    }
                    this.cluster.ifRunning(key.intValue(), raftNode -> {
                        Assertions.assertEquals(valueOf.intValue(), raftNode.client.quorum().epoch());
                    });
                    this.nodeEpochs.put(key, valueOf);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$MonotonicHighWatermark.class */
    public static class MonotonicHighWatermark implements Invariant {
        final Cluster cluster;
        long highWatermark;

        private MonotonicHighWatermark(Cluster cluster) {
            this.highWatermark = 0L;
            this.cluster = cluster;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Invariant
        public void verify() {
            this.cluster.leaderHighWatermark().ifPresent(j -> {
                long j = this.highWatermark;
                this.highWatermark = j;
                if (j < j) {
                    Assertions.fail("Non-monotonic update of high watermark detected: " + j + " -> " + j);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$NetworkFilter.class */
    public interface NetworkFilter {
        boolean acceptInbound(RaftMessage raftMessage);

        boolean acceptOutbound(RaftMessage raftMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$PeriodicEvent.class */
    public static class PeriodicEvent extends Event {
        final Random random;
        final int periodMs;
        final int jitterMs;

        protected PeriodicEvent(Runnable runnable, int i, Random random, long j, int i2, int i3) {
            super(runnable, i, j);
            this.random = random;
            this.periodMs = i2;
            this.jitterMs = i3;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Event
        void execute(EventScheduler eventScheduler) {
            super.execute(eventScheduler);
            eventScheduler.schedule(this.action, this.periodMs + (this.jitterMs == 0 ? 0 : this.random.nextInt(this.jitterMs)), this.periodMs, this.jitterMs);
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$PermitAllTraffic.class */
    private static class PermitAllTraffic implements NetworkFilter {
        private PermitAllTraffic() {
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.NetworkFilter
        public boolean acceptInbound(RaftMessage raftMessage) {
            return true;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.NetworkFilter
        public boolean acceptOutbound(RaftMessage raftMessage) {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$PersistentState.class */
    public static class PersistentState {
        final MockQuorumStateStore store = new MockQuorumStateStore();
        final MockLog log;

        PersistentState(int i) {
            this.log = new MockLog(RaftEventSimulationTest.METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, new LogContext(String.format("[Node %s] ", Integer.valueOf(i))));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$RaftNode.class */
    public static class RaftNode {
        final int nodeId;
        final KafkaRaftClient<Integer> client;
        final MockLog log;
        final MockNetworkChannel channel;
        final MockMessageQueue messageQueue;
        final MockQuorumStateStore store;
        final LogContext logContext;
        final ReplicatedCounter counter;
        final Time time;
        final Random random;
        final RecordSerde<Integer> intSerde;

        private RaftNode(int i, KafkaRaftClient<Integer> kafkaRaftClient, MockLog mockLog, MockNetworkChannel mockNetworkChannel, MockMessageQueue mockMessageQueue, MockQuorumStateStore mockQuorumStateStore, LogContext logContext, Time time, Random random, RecordSerde<Integer> recordSerde) {
            this.nodeId = i;
            this.client = kafkaRaftClient;
            this.log = mockLog;
            this.channel = mockNetworkChannel;
            this.messageQueue = mockMessageQueue;
            this.store = mockQuorumStateStore;
            this.logContext = logContext;
            this.time = time;
            this.random = random;
            this.counter = new ReplicatedCounter(i, kafkaRaftClient, logContext);
            this.intSerde = recordSerde;
        }

        void initialize() {
            this.client.register(this.counter);
            this.client.initialize();
        }

        void poll() {
            do {
                try {
                    this.client.poll();
                    if (!this.client.isRunning()) {
                        break;
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Uncaught exception during poll of node " + this.nodeId, e);
                }
            } while (!this.messageQueue.isEmpty());
        }

        long highWatermark() {
            return ((Long) this.client.quorum().highWatermark().map(logOffsetMetadata -> {
                return Long.valueOf(logOffsetMetadata.offset);
            }).orElse(0L)).longValue();
        }

        long logEndOffset() {
            return this.log.endOffset().offset;
        }

        public String toString() {
            return String.format("Node(id=%s, hw=%s, logEndOffset=%s)", Integer.valueOf(this.nodeId), Long.valueOf(highWatermark()), Long.valueOf(logEndOffset()));
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$SequentialAppendAction.class */
    private static class SequentialAppendAction implements Runnable {
        final Cluster cluster;

        private SequentialAppendAction(Cluster cluster) {
            this.cluster = cluster;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.cluster.withCurrentLeader(raftNode -> {
                if (raftNode.client.isShuttingDown() || !raftNode.counter.isWritable()) {
                    return;
                }
                raftNode.counter.increment();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$SingleLeader.class */
    public static class SingleLeader implements Invariant {
        final Cluster cluster;
        int epoch;
        OptionalInt leaderId;

        private SingleLeader(Cluster cluster) {
            this.epoch = 0;
            this.leaderId = OptionalInt.empty();
            this.cluster = cluster;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Invariant
        public void verify() {
            Iterator<Map.Entry<Integer, PersistentState>> it = this.cluster.nodes.entrySet().iterator();
            while (it.hasNext()) {
                ElectionState readElectionState = it.next().getValue().store.readElectionState();
                if (readElectionState != null && readElectionState.epoch >= this.epoch && readElectionState.hasLeader()) {
                    if (this.epoch == readElectionState.epoch && this.leaderId.isPresent()) {
                        Assertions.assertEquals(this.leaderId.getAsInt(), readElectionState.leaderId());
                    } else {
                        this.epoch = readElectionState.epoch;
                        this.leaderId = OptionalInt.of(readElectionState.leaderId());
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$SnapshotAtLogStart.class */
    public static class SnapshotAtLogStart implements Invariant {
        final Cluster cluster;

        private SnapshotAtLogStart(Cluster cluster) {
            this.cluster = cluster;
        }

        @Override // org.apache.kafka.raft.RaftEventSimulationTest.Invariant
        public void verify() {
            for (Map.Entry<Integer, PersistentState> entry : this.cluster.nodes.entrySet()) {
                int intValue = entry.getKey().intValue();
                MockLog mockLog = entry.getValue().log;
                mockLog.earliestSnapshotId().ifPresent(offsetAndEpoch -> {
                    long startOffset = mockLog.startOffset();
                    ValidOffsetAndEpoch validateOffsetAndEpoch = mockLog.validateOffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.epoch);
                    Assertions.assertTrue(startOffset <= offsetAndEpoch.offset, () -> {
                        return String.format("invalid log start offset (%s) and snapshotId offset (%s): nodeId = %s", Long.valueOf(startOffset), Long.valueOf(offsetAndEpoch.offset), Integer.valueOf(intValue));
                    });
                    Assertions.assertEquals(ValidOffsetAndEpoch.valid(offsetAndEpoch), validateOffsetAndEpoch, () -> {
                        return String.format("invalid leader epoch cache: nodeId = %s", Integer.valueOf(intValue));
                    });
                    if (startOffset > 0) {
                        Assertions.assertEquals(startOffset, offsetAndEpoch.offset, () -> {
                            return String.format("mising snapshot at log start offset: nodeId = %s", Integer.valueOf(intValue));
                        });
                    }
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/RaftEventSimulationTest$Validation.class */
    public interface Validation {
        void validate();
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canElectInitialLeader(@ForAll int i, @IntRange(min = 1, max = 5) @ForAll int i2, @IntRange(min = 0, max = 5) @ForAll int i3) {
        Cluster cluster = new Cluster(i2, i3, new Random(i));
        MessageRouter messageRouter = new MessageRouter(cluster);
        EventScheduler schedulerWithDefaultInvariants = schedulerWithDefaultInvariants(cluster);
        cluster.startAll();
        schedulePolling(schedulerWithDefaultInvariants, cluster, 3, 5);
        messageRouter.getClass();
        schedulerWithDefaultInvariants.schedule(messageRouter::deliverAll, 0, 2, 1);
        schedulerWithDefaultInvariants.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        cluster.getClass();
        schedulerWithDefaultInvariants.runUntil(cluster::hasConsistentLeader);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(10L));
        });
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canElectNewLeaderAfterOldLeaderFailure(@ForAll int i, @IntRange(min = 3, max = 5) @ForAll int i2, @IntRange(min = 0, max = 5) @ForAll int i3, @ForAll boolean z) {
        Cluster cluster = new Cluster(i2, i3, new Random(i));
        MessageRouter messageRouter = new MessageRouter(cluster);
        EventScheduler schedulerWithDefaultInvariants = schedulerWithDefaultInvariants(cluster);
        cluster.startAll();
        schedulePolling(schedulerWithDefaultInvariants, cluster, 3, 5);
        messageRouter.getClass();
        schedulerWithDefaultInvariants.schedule(messageRouter::deliverAll, 0, 2, 1);
        schedulerWithDefaultInvariants.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        cluster.getClass();
        schedulerWithDefaultInvariants.runUntil(cluster::hasConsistentLeader);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(10L));
        });
        int orElseThrow = cluster.latestLeader().orElseThrow(() -> {
            return new AssertionError("Failed to find current leader");
        });
        if (z) {
            cluster.shutdown(orElseThrow);
        } else {
            cluster.kill(orElseThrow);
        }
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(20L));
        });
        long maxHighWatermarkReached = cluster.maxHighWatermarkReached();
        cluster.start(orElseThrow);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(maxHighWatermarkReached + 10));
        });
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canRecoverAfterAllNodesKilled(@ForAll int i, @IntRange(min = 1, max = 5) @ForAll int i2, @IntRange(min = 0, max = 5) @ForAll int i3) {
        Cluster cluster = new Cluster(i2, i3, new Random(i));
        MessageRouter messageRouter = new MessageRouter(cluster);
        EventScheduler schedulerWithDefaultInvariants = schedulerWithDefaultInvariants(cluster);
        cluster.startAll();
        schedulePolling(schedulerWithDefaultInvariants, cluster, 3, 5);
        messageRouter.getClass();
        schedulerWithDefaultInvariants.schedule(messageRouter::deliverAll, 0, 2, 1);
        schedulerWithDefaultInvariants.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        cluster.getClass();
        schedulerWithDefaultInvariants.runUntil(cluster::hasConsistentLeader);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(10L));
        });
        long maxHighWatermarkReached = cluster.maxHighWatermarkReached();
        cluster.killAll();
        Iterator<Integer> it = cluster.nodes().iterator();
        for (int i4 = 0; i4 < cluster.majoritySize(); i4++) {
            cluster.start(it.next().intValue());
        }
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(maxHighWatermarkReached + 10));
        });
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canElectNewLeaderAfterOldLeaderPartitionedAway(@ForAll int i, @IntRange(min = 3, max = 5) @ForAll int i2, @IntRange(min = 0, max = 5) @ForAll int i3) {
        Cluster cluster = new Cluster(i2, i3, new Random(i));
        MessageRouter messageRouter = new MessageRouter(cluster);
        EventScheduler schedulerWithDefaultInvariants = schedulerWithDefaultInvariants(cluster);
        cluster.startAll();
        schedulePolling(schedulerWithDefaultInvariants, cluster, 3, 5);
        messageRouter.getClass();
        schedulerWithDefaultInvariants.schedule(messageRouter::deliverAll, 0, 2, 2);
        schedulerWithDefaultInvariants.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        cluster.getClass();
        schedulerWithDefaultInvariants.runUntil(cluster::hasConsistentLeader);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(10L));
        });
        int orElseThrow = cluster.latestLeader().orElseThrow(() -> {
            return new AssertionError("Failed to find current leader");
        });
        messageRouter.filter(orElseThrow, new DropAllTraffic());
        HashSet hashSet = new HashSet(cluster.nodes());
        hashSet.remove(Integer.valueOf(orElseThrow));
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(20L, hashSet));
        });
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canMakeProgressIfMajorityIsReachable(@ForAll int i, @IntRange(min = 0, max = 3) @ForAll int i2) {
        Cluster cluster = new Cluster(5, i2, new Random(i));
        MessageRouter messageRouter = new MessageRouter(cluster);
        EventScheduler schedulerWithDefaultInvariants = schedulerWithDefaultInvariants(cluster);
        cluster.startAll();
        schedulePolling(schedulerWithDefaultInvariants, cluster, 3, 5);
        messageRouter.getClass();
        schedulerWithDefaultInvariants.schedule(messageRouter::deliverAll, 0, 2, 2);
        schedulerWithDefaultInvariants.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        cluster.getClass();
        schedulerWithDefaultInvariants.runUntil(cluster::hasConsistentLeader);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(10L));
        });
        messageRouter.filter(0, new DropOutboundRequestsFrom(Utils.mkSet(new Integer[]{2, 3, 4})));
        messageRouter.filter(1, new DropOutboundRequestsFrom(Utils.mkSet(new Integer[]{2, 3, 4})));
        messageRouter.filter(2, new DropOutboundRequestsFrom(Utils.mkSet(new Integer[]{0, 1})));
        messageRouter.filter(3, new DropOutboundRequestsFrom(Utils.mkSet(new Integer[]{0, 1})));
        messageRouter.filter(4, new DropOutboundRequestsFrom(Utils.mkSet(new Integer[]{0, 1})));
        long maxLogEndOffset = cluster.maxLogEndOffset();
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(2 * maxLogEndOffset));
        });
        long maxHighWatermarkReached = cluster.maxHighWatermarkReached(Utils.mkSet(new Integer[]{0, 1}));
        long maxHighWatermarkReached2 = cluster.maxHighWatermarkReached(Utils.mkSet(new Integer[]{2, 3, 4}));
        Assertions.assertTrue(maxHighWatermarkReached2 > maxHighWatermarkReached, String.format("majorityHighWatermark = %s, minorityHighWatermark = %s", Long.valueOf(maxHighWatermarkReached2), Long.valueOf(maxHighWatermarkReached)));
        messageRouter.filter(0, new PermitAllTraffic());
        messageRouter.filter(1, new PermitAllTraffic());
        messageRouter.filter(2, new PermitAllTraffic());
        messageRouter.filter(3, new PermitAllTraffic());
        messageRouter.filter(4, new PermitAllTraffic());
        long maxLogEndOffset2 = cluster.maxLogEndOffset();
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(2 * maxLogEndOffset2));
        });
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canMakeProgressAfterBackToBackLeaderFailures(@ForAll int i, @IntRange(min = 3, max = 5) @ForAll int i2, @IntRange(min = 0, max = 5) @ForAll int i3) {
        Cluster cluster = new Cluster(i2, i3, new Random(i));
        MessageRouter messageRouter = new MessageRouter(cluster);
        EventScheduler schedulerWithDefaultInvariants = schedulerWithDefaultInvariants(cluster);
        cluster.startAll();
        schedulePolling(schedulerWithDefaultInvariants, cluster, 3, 5);
        messageRouter.getClass();
        schedulerWithDefaultInvariants.schedule(messageRouter::deliverAll, 0, 2, 5);
        schedulerWithDefaultInvariants.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        cluster.getClass();
        schedulerWithDefaultInvariants.runUntil(cluster::hasConsistentLeader);
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(10L));
        });
        int asInt = cluster.latestLeader().getAsInt();
        messageRouter.filter(asInt, new DropAllTraffic());
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != asInt);
        });
        int asInt2 = cluster.latestLeader().getAsInt();
        messageRouter.filter(asInt, new PermitAllTraffic());
        messageRouter.filter(asInt2, new DropAllTraffic());
        long maxHighWatermarkReached = cluster.maxHighWatermarkReached() + 10;
        schedulerWithDefaultInvariants.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(maxHighWatermarkReached));
        });
    }

    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
    void canRecoverFromSingleNodeCommittedDataLoss(@ForAll int i, @IntRange(min = 3, max = 5) @ForAll int i2, @IntRange(min = 0, max = 2) @ForAll int i3) {
        Cluster cluster = new Cluster(i2, i3, new Random(i));
        EventScheduler eventScheduler = new EventScheduler(cluster.random, cluster.time);
        eventScheduler.addInvariant(new MonotonicHighWatermark(cluster));
        eventScheduler.addInvariant(new SingleLeader(cluster));
        eventScheduler.addValidation(new ConsistentCommittedData(cluster));
        MessageRouter messageRouter = new MessageRouter(cluster);
        cluster.startAll();
        schedulePolling(eventScheduler, cluster, 3, 5);
        messageRouter.getClass();
        eventScheduler.schedule(messageRouter::deliverAll, 0, 2, 5);
        eventScheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
        eventScheduler.runUntil(() -> {
            return Boolean.valueOf(cluster.anyReachedHighWatermark(10L));
        });
        RaftNode orElseThrow = cluster.randomRunning().orElseThrow(() -> {
            return new AssertionError("Failed to find running node");
        });
        cluster.killAndDeletePersistentState(orElseThrow.nodeId);
        eventScheduler.runUntil(() -> {
            return Boolean.valueOf(!cluster.hasLeader(orElseThrow.nodeId) && cluster.hasConsistentLeader());
        });
        long maxHighWatermarkReached = cluster.maxHighWatermarkReached();
        cluster.start(orElseThrow.nodeId);
        eventScheduler.runUntil(() -> {
            return Boolean.valueOf(cluster.allReachedHighWatermark(maxHighWatermarkReached + 10));
        });
    }

    private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) {
        EventScheduler eventScheduler = new EventScheduler(cluster.random, cluster.time);
        eventScheduler.addInvariant(new MonotonicHighWatermark(cluster));
        eventScheduler.addInvariant(new MonotonicEpoch(cluster));
        eventScheduler.addInvariant(new MajorityReachedHighWatermark(cluster));
        eventScheduler.addInvariant(new SingleLeader(cluster));
        eventScheduler.addInvariant(new SnapshotAtLogStart(cluster));
        eventScheduler.addInvariant(new LeaderNeverLoadSnapshot(cluster));
        eventScheduler.addValidation(new ConsistentCommittedData(cluster));
        return eventScheduler;
    }

    private void schedulePolling(EventScheduler eventScheduler, Cluster cluster, int i, int i2) {
        int i3 = 0;
        Iterator<Integer> it = cluster.nodes().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            eventScheduler.schedule(() -> {
                cluster.pollIfRunning(intValue);
            }, i3, i, i2);
            i3++;
        }
    }
}
