package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.class */
public class TaskAssignorConvergenceTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest$Harness.class */
    public static final class Harness {
        private final Set<TaskId> statelessTasks;
        private final Map<TaskId, Long> statefulTaskEndOffsetSums;
        private final Map<UUID, ClientState> clientStates;
        private final StringBuilder history = new StringBuilder();
        private final Map<UUID, ClientState> droppedClientStates = new TreeMap();

        /* JADX INFO: Access modifiers changed from: private */
        public static Harness initializeCluster(int i, int i2, int i3, Supplier<Integer> supplier) {
            int i4 = 0;
            TreeSet treeSet = new TreeSet();
            int i5 = i;
            while (i5 > 0) {
                int min = Math.min(i5, supplier.get().intValue());
                for (int i6 = 0; i6 < min; i6++) {
                    treeSet.add(new TaskId(i4, i6));
                    i5--;
                }
                i4++;
            }
            TreeMap treeMap = new TreeMap();
            int i7 = i2;
            while (i7 > 0) {
                int min2 = Math.min(i7, supplier.get().intValue());
                for (int i8 = 0; i8 < min2; i8++) {
                    treeMap.put(new TaskId(i4, i8), 150000L);
                    i7--;
                }
                i4++;
            }
            TreeMap treeMap2 = new TreeMap();
            for (int i9 = 0; i9 < i3; i9++) {
                UUID uuidForInt = AssignmentTestUtils.uuidForInt(i9);
                treeMap2.put(uuidForInt, emptyInstance(uuidForInt, treeMap));
            }
            return new Harness(treeSet, treeMap, treeMap2);
        }

        private Harness(Set<TaskId> set, Map<TaskId, Long> map, Map<UUID, ClientState> map2) {
            this.statelessTasks = set;
            this.statefulTaskEndOffsetSums = map;
            this.clientStates = map2;
            this.history.append('\n');
            this.history.append("Cluster and application initial state: \n");
            this.history.append("Stateless tasks: ").append(set).append('\n');
            this.history.append("Stateful tasks:  ").append(map.keySet()).append('\n');
            formatClientStates(true);
            this.history.append("History of the cluster: \n");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addNode() {
            UUID uuidForInt = AssignmentTestUtils.uuidForInt(this.clientStates.size() + this.droppedClientStates.size());
            this.history.append("Adding new node ").append(uuidForInt).append('\n');
            this.clientStates.put(uuidForInt, emptyInstance(uuidForInt, this.statefulTaskEndOffsetSums));
        }

        private static ClientState emptyInstance(UUID uuid, Map<TaskId, Long> map) {
            ClientState clientState = new ClientState(1);
            clientState.computeTaskLags(uuid, map);
            return clientState;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addOrResurrectNodesRandomly(Random random, int i) {
            int nextInt = random.nextInt(i);
            for (int i2 = 0; i2 < nextInt; i2++) {
                if (random.nextBoolean() || this.droppedClientStates.isEmpty()) {
                    addNode();
                } else {
                    UUID selectRandomElement = selectRandomElement(random, this.droppedClientStates);
                    this.history.append("Resurrecting node ").append(selectRandomElement).append('\n');
                    this.clientStates.put(selectRandomElement, this.droppedClientStates.get(selectRandomElement));
                    this.droppedClientStates.remove(selectRandomElement);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void dropNode() {
            if (this.clientStates.isEmpty()) {
                throw new NoSuchElementException("There are no nodes to drop");
            }
            dropNode(this.clientStates.keySet().iterator().next());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void dropRandomNodes(int i, Random random) {
            for (int i2 = 0; !this.clientStates.isEmpty() && i2 < i; i2++) {
                dropNode(selectRandomElement(random, this.clientStates));
            }
            this.history.append("Stateless tasks: ").append(this.statelessTasks).append('\n');
            this.history.append("Stateful tasks:  ").append(this.statefulTaskEndOffsetSums.keySet()).append('\n');
            formatClientStates(true);
        }

        private void dropNode(UUID uuid) {
            ClientState remove = this.clientStates.remove(uuid);
            this.history.append("Dropping node ").append(uuid).append(": ").append(remove).append('\n');
            this.droppedClientStates.put(uuid, remove);
        }

        private static UUID selectRandomElement(Random random, Map<UUID, ClientState> map) {
            int nextInt = random.nextInt(map.size());
            UUID uuid = null;
            Iterator<UUID> it = map.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                UUID next = it.next();
                if (nextInt == 0) {
                    uuid = next;
                    break;
                }
                nextInt--;
            }
            return uuid;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareForNextRebalance() {
            TreeMap treeMap = new TreeMap();
            for (Map.Entry<UUID, ClientState> entry : this.clientStates.entrySet()) {
                UUID key = entry.getKey();
                ClientState clientState = new ClientState(1);
                ClientState value = entry.getValue();
                TreeMap treeMap2 = new TreeMap();
                for (TaskId taskId : value.activeTasks()) {
                    if (this.statefulTaskEndOffsetSums.containsKey(taskId)) {
                        treeMap2.put(taskId, this.statefulTaskEndOffsetSums.get(taskId));
                    }
                }
                for (TaskId taskId2 : value.standbyTasks()) {
                    if (this.statefulTaskEndOffsetSums.containsKey(taskId2)) {
                        treeMap2.put(taskId2, this.statefulTaskEndOffsetSums.get(taskId2));
                    }
                }
                clientState.addPreviousActiveTasks(value.activeTasks());
                clientState.addPreviousStandbyTasks(value.standbyTasks());
                clientState.addPreviousTasksAndOffsetSums("consumer", treeMap2);
                clientState.computeTaskLags(key, this.statefulTaskEndOffsetSums);
                treeMap.put(key, clientState);
            }
            this.clientStates.clear();
            this.clientStates.putAll(treeMap);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordConfig(AssignorConfiguration.AssignmentConfigs assignmentConfigs) {
            this.history.append("Creating assignor with configuration: ").append(assignmentConfigs).append('\n');
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordBefore(int i) {
            this.history.append("Starting Iteration: ").append(i).append('\n');
            formatClientStates(false);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordAfter(int i, boolean z) {
            this.history.append("After assignment:  ").append(i).append('\n');
            this.history.append("Rebalance pending: ").append(z).append('\n');
            formatClientStates(true);
            this.history.append('\n');
        }

        private void formatClientStates(boolean z) {
            AssignmentTestUtils.appendClientStates(this.history, this.clientStates);
            if (z) {
                TreeSet treeSet = new TreeSet();
                treeSet.addAll(this.statefulTaskEndOffsetSums.keySet());
                treeSet.addAll(this.statelessTasks);
                Iterator<Map.Entry<UUID, ClientState>> it = this.clientStates.entrySet().iterator();
                while (it.hasNext()) {
                    treeSet.removeAll(it.next().getValue().assignedTasks());
                }
                this.history.append("Unassigned Tasks: ").append(treeSet).append('\n');
            }
        }
    }

    @Test
    public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
        AssignorConfiguration.AssignmentConfigs assignmentConfigs = new AssignorConfiguration.AssignmentConfigs(100L, 2, 0, Long.valueOf(IntegrationTestUtils.DEFAULT_TIMEOUT), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
        Harness initializeCluster = Harness.initializeCluster(1, 1, 1, () -> {
            return 1;
        });
        testForConvergence(initializeCluster, assignmentConfigs, 1);
        verifyValidAssignment(0, initializeCluster);
        verifyBalancedAssignment(initializeCluster);
    }

    @Test
    public void assignmentShouldConvergeAfterAddingNode() {
        AssignorConfiguration.AssignmentConfigs assignmentConfigs = new AssignorConfiguration.AssignmentConfigs(100L, 2, 0, Long.valueOf(IntegrationTestUtils.DEFAULT_TIMEOUT), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
        Harness initializeCluster = Harness.initializeCluster(7, 11, 1, () -> {
            return 5;
        });
        testForConvergence(initializeCluster, assignmentConfigs, 1);
        initializeCluster.addNode();
        testForConvergence(initializeCluster, assignmentConfigs, 6);
        verifyValidAssignment(0, initializeCluster);
        verifyBalancedAssignment(initializeCluster);
    }

    @Test
    public void droppingNodesShouldConverge() {
        AssignorConfiguration.AssignmentConfigs assignmentConfigs = new AssignorConfiguration.AssignmentConfigs(100L, 2, 0, Long.valueOf(IntegrationTestUtils.DEFAULT_TIMEOUT), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
        Harness initializeCluster = Harness.initializeCluster(11, 13, 7, () -> {
            return 5;
        });
        testForConvergence(initializeCluster, assignmentConfigs, 1);
        initializeCluster.dropNode();
        testForConvergence(initializeCluster, assignmentConfigs, 8);
        verifyValidAssignment(0, initializeCluster);
        verifyBalancedAssignment(initializeCluster);
    }

    @Test
    public void randomClusterPerturbationsShouldConverge() {
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        do {
            runRandomizedScenario(new Random().nextLong());
        } while (System.currentTimeMillis() < currentTimeMillis);
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x009d. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:11:0x00f8 A[Catch: AssertionError -> 0x011b, Throwable -> 0x013d, TryCatch #2 {AssertionError -> 0x011b, Throwable -> 0x013d, blocks: (B:3:0x0002, B:6:0x0094, B:7:0x009d, B:8:0x00b8, B:9:0x00ec, B:11:0x00f8, B:13:0x0112, B:15:0x00c6, B:17:0x00d0, B:18:0x00eb), top: B:2:0x0002 }] */
    /* JADX WARN: Removed duplicated region for block: B:14:0x0112 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static void runRandomizedScenario(long r8) {
        /*
            Method dump skipped, instructions count: 370
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(long):void");
    }

    private static void verifyBalancedAssignment(Harness harness) {
        Set keySet = harness.statefulTaskEndOffsetSums.keySet();
        Map map = harness.clientStates;
        StringBuilder sb = harness.history;
        AssignmentTestUtils.assertBalancedActiveAssignment(map, sb);
        AssignmentTestUtils.assertBalancedStatefulAssignment(keySet, map, sb);
        AssignmentTestUtils.TaskSkewReport analyzeTaskAssignmentBalance = AssignmentTestUtils.analyzeTaskAssignmentBalance(harness.clientStates);
        if (analyzeTaskAssignmentBalance.totalSkewedTasks() > 0) {
            Assert.fail("Expected a balanced task assignment, but was: " + analyzeTaskAssignmentBalance + '\n' + ((CharSequence) sb));
        }
    }

    private static void verifyValidAssignment(int i, Harness harness) {
        AssignmentTestUtils.assertValidAssignment(i, harness.statefulTaskEndOffsetSums.keySet(), harness.statelessTasks, harness.clientStates, harness.history);
    }

    private static void testForConvergence(Harness harness, AssignorConfiguration.AssignmentConfigs assignmentConfigs, int i) {
        TreeSet treeSet = new TreeSet();
        treeSet.addAll(harness.statelessTasks);
        treeSet.addAll(harness.statefulTaskEndOffsetSums.keySet());
        harness.recordConfig(assignmentConfigs);
        boolean z = true;
        int i2 = 0;
        while (z && i2 < i) {
            i2++;
            harness.prepareForNextRebalance();
            harness.recordBefore(i2);
            z = new HighAvailabilityTaskAssignor().assign(harness.clientStates, treeSet, harness.statefulTaskEndOffsetSums.keySet(), assignmentConfigs);
            harness.recordAfter(i2, z);
        }
        if (z) {
            Assert.fail("Rebalances have not converged after iteration cutoff: " + i + ((CharSequence) harness.history));
        }
    }
}
