package org.apache.kafka.connect.runtime.distributed;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConnectorTest;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.class */
public class IncrementalCooperativeAssignorTest {
    private static final long CONFIG_OFFSET = 618;
    private Map<String, Integer> connectors;
    private Time time;
    private int rebalanceDelay;
    private IncrementalCooperativeAssignor assignor;
    private int generationId;
    private IncrementalCooperativeAssignor.ClusterAssignment returnedAssignments;
    private Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments;

    @Before
    public void setup() {
        this.generationId = 1000;
        this.time = Time.SYSTEM;
        this.rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
        this.connectors = new HashMap();
        addNewConnector("connector1", 4);
        addNewConnector("connector2", 4);
        this.memberAssignments = new HashMap();
        addNewEmptyWorkers("worker1");
        initAssignor();
    }

    public void initAssignor() {
        this.assignor = new IncrementalCooperativeAssignor(new LogContext(), this.time, this.rebalanceDelay);
        this.assignor.previousGenerationId = this.generationId;
    }

    @Test
    public void testTaskAssignmentWhenWorkerJoins() {
        this.time = new MockTime();
        initAssignor();
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(2);
        assertTaskAllocations(8);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 1);
        assertTaskAllocations(0, 4);
        performStandardRebalance();
        assertNoRevocations();
        assertDelay(0);
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        this.time.sleep(this.assignor.delay);
        performStandardRebalance();
        assertDelay(0);
        assertEmptyAssignment();
    }

    @Test
    public void testAssignmentsWhenWorkersJoinAfterRevocations() {
        this.time = new MockTime();
        initAssignor();
        addNewConnector("connector3", 4);
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(3);
        assertTaskAllocations(12);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 2);
        assertTaskAllocations(0, 6);
        addNewEmptyWorkers("worker3");
        performStandardRebalance();
        Assert.assertTrue(this.assignor.delay > 0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 2);
        assertTaskAllocations(3, 3, 6);
        this.time.sleep(this.assignor.delay);
        addNewEmptyWorkers("worker4");
        performStandardRebalance();
        assertWorkers("worker1", "worker2", "worker3", "worker4");
        assertConnectorAllocations(0, 0, 1, 1);
        assertTaskAllocations(0, 3, 3, 3);
        addNewEmptyWorkers("worker5");
        performStandardRebalance();
        Assert.assertTrue(this.assignor.delay > 40);
        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
        assertConnectorAllocations(0, 0, 1, 1, 1);
        assertTaskAllocations(1, 2, 3, 3, 3);
        this.time.sleep(this.assignor.delay);
        addNewEmptyWorkers("worker6");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
        assertTaskAllocations(0, 1, 2, 2, 2, 2);
        performStandardRebalance();
        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
        assertTaskAllocations(2, 2, 2, 2, 2, 2);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testImmediateRevocationsWhenMaxDelayIs0() {
        this.rebalanceDelay = 0;
        this.time = new MockTime();
        initAssignor();
        addNewConnector("connector3", 4);
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(3);
        assertTaskAllocations(12);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 2);
        assertTaskAllocations(0, 6);
        addNewEmptyWorkers("worker3");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(3, 3, 4);
        performStandardRebalance();
        assertWorkers("worker1", "worker2", "worker3");
        assertNoRevocations();
        assertConnectorAllocations(1, 1, 1);
        assertTaskAllocations(4, 4, 4);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
        this.rebalanceDelay = 1;
        initAssignor();
        addNewConnector("connector3", 4);
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(3);
        assertTaskAllocations(12);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 2);
        assertTaskAllocations(0, 6);
        addNewEmptyWorkers("worker3");
        performStandardRebalance();
        assertDelay(1);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 2);
        assertTaskAllocations(3, 3, 6);
    }

    @Test
    public void testWorkerJoiningDuringDelayedRebalance() {
        this.time = new MockTime();
        initAssignor();
        addNewConnector("connector3", 4);
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(3);
        assertTaskAllocations(12);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 2);
        assertTaskAllocations(0, 6);
        addNewEmptyWorkers("worker3");
        performStandardRebalance();
        Assert.assertTrue(this.assignor.delay > 0);
        assertWorkers("worker1", "worker2", "worker3");
        assertNoRevocations();
        assertConnectorAllocations(0, 1, 2);
        assertTaskAllocations(3, 3, 6);
        this.time.sleep(this.assignor.delay / 2);
        addNewEmptyWorkers("worker4");
        performStandardRebalance();
        Assert.assertTrue(this.assignor.delay > 0);
        assertWorkers("worker1", "worker2", "worker3", "worker4");
        assertNoRevocations();
        assertConnectorAllocations(0, 0, 1, 2);
        assertTaskAllocations(0, 3, 3, 6);
        this.time.sleep(this.assignor.delay);
        addNewEmptyWorkers("worker5");
        performStandardRebalance();
        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
        assertDelay(0);
        assertConnectorAllocations(0, 0, 0, 1, 1);
        assertTaskAllocations(0, 0, 2, 3, 3);
        performStandardRebalance();
        assertDelay(0);
        assertNoRevocations();
        assertConnectorAllocations(0, 0, 1, 1, 1);
        assertTaskAllocations(2, 2, 2, 3, 3);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenWorkerLeavesPermanently() {
        this.time = new MockTime();
        initAssignor();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        removeWorkers("worker2");
        performStandardRebalance();
        assertDelay(this.rebalanceDelay);
        assertWorkers("worker1");
        assertEmptyAssignment();
        this.time.sleep(this.rebalanceDelay / 2);
        performStandardRebalance();
        assertDelay(this.rebalanceDelay / 2);
        assertEmptyAssignment();
        this.time.sleep((this.rebalanceDelay / 2) + 1);
        performStandardRebalance();
        assertDelay(0);
        assertConnectorAllocations(2);
        assertTaskAllocations(8);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenWorkerBounces() {
        this.time = new MockTime();
        initAssignor();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        removeWorkers("worker2");
        performStandardRebalance();
        assertDelay(this.rebalanceDelay);
        assertWorkers("worker1");
        assertEmptyAssignment();
        this.time.sleep(this.rebalanceDelay / 2);
        performStandardRebalance();
        assertDelay(this.rebalanceDelay / 2);
        assertEmptyAssignment();
        this.time.sleep(this.rebalanceDelay / 4);
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(this.rebalanceDelay / 4);
        assertWorkers("worker1", "worker2");
        assertEmptyAssignment();
        this.time.sleep(this.rebalanceDelay / 4);
        performStandardRebalance();
        assertDelay(0);
        assertNoRevocations();
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenLeaderLeavesPermanently() {
        addNewEmptyWorkers("worker2", "worker3");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(2, 3, 3);
        assertBalancedAndCompleteAllocation();
        removeWorkers("worker1");
        initAssignor();
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker2", "worker3");
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        performStandardRebalance();
        assertDelay(0);
        assertEmptyAssignment();
    }

    @Test
    public void testTaskAssignmentWhenLeaderBounces() {
        addNewEmptyWorkers("worker2", "worker3");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(2, 3, 3);
        assertBalancedAndCompleteAllocation();
        removeWorkers("worker1");
        initAssignor();
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker2", "worker3");
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker1");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 3, 3);
        performStandardRebalance();
        assertDelay(0);
        assertNoRevocations();
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(2, 3, 3);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenFirstAssignmentAttemptFails() {
        addNewEmptyWorkers("worker2");
        performFailedRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 0);
        assertTaskAllocations(0, 0);
        performStandardRebalance();
        assertDelay(0);
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
        this.time = new MockTime();
        initAssignor();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker3");
        performFailedRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 4, 4);
        performStandardRebalance();
        Assert.assertTrue(this.assignor.delay > 0);
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 4, 4);
        this.time.sleep(this.assignor.delay);
        performStandardRebalance();
        assertDelay(0);
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 3, 3);
        performStandardRebalance();
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(2, 3, 3);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssignor() {
        this.time = new MockTime();
        initAssignor();
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        addNewEmptyWorkers("worker3");
        performFailedRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2", "worker3");
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 4, 4);
        performRebalanceWithMismatchedGeneration();
        Assert.assertTrue(this.assignor.delay > 0);
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 4, 4);
        this.time.sleep(this.assignor.delay);
        performStandardRebalance();
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(0, 3, 3);
        performStandardRebalance();
        assertConnectorAllocations(0, 1, 1);
        assertTaskAllocations(2, 3, 3);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testTaskAssignmentWhenConnectorsAreDeleted() {
        addNewConnector("connector3", 4);
        addNewEmptyWorkers("worker2");
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(1, 2);
        assertTaskAllocations(6, 6);
        assertBalancedAndCompleteAllocation();
        removeConnector("connector3");
        performStandardRebalance();
        assertDelay(0);
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
    }

    @Test
    public void testAssignConnectorsWhenBalanced() {
        int i = 2;
        List list = (List) IntStream.range(0, 3).mapToObj(i2 -> {
            return workerLoad("worker" + i2, i2 * i, i, i2 * i, i);
        }).collect(Collectors.toList());
        List list2 = (List) list.stream().map(workerLoad -> {
            return new WorkerCoordinator.WorkerLoad.Builder(workerLoad.worker()).withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        }).collect(Collectors.toList());
        ((WorkerCoordinator.WorkerLoad) list2.get(0)).connectors().addAll(Arrays.asList("connector6", "connector9"));
        ((WorkerCoordinator.WorkerLoad) list2.get(1)).connectors().addAll(Arrays.asList("connector7", "connector10"));
        ((WorkerCoordinator.WorkerLoad) list2.get(2)).connectors().addAll(Arrays.asList("connector8"));
        this.assignor.assignConnectors(list, newConnectors(6, 11));
        Assert.assertEquals(list2, list);
    }

    @Test
    public void testAssignTasksWhenBalanced() {
        int i = 2;
        List list = (List) IntStream.range(0, 3).mapToObj(i2 -> {
            return workerLoad("worker" + i2, i2 * i, i, i2 * i, i);
        }).collect(Collectors.toList());
        List list2 = (List) list.stream().map(workerLoad -> {
            return new WorkerCoordinator.WorkerLoad.Builder(workerLoad.worker()).withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        }).collect(Collectors.toList());
        ((WorkerCoordinator.WorkerLoad) list2.get(0)).connectors().addAll(Arrays.asList("connector6", "connector9"));
        ((WorkerCoordinator.WorkerLoad) list2.get(1)).connectors().addAll(Arrays.asList("connector7", "connector10"));
        ((WorkerCoordinator.WorkerLoad) list2.get(2)).connectors().addAll(Arrays.asList("connector8"));
        ((WorkerCoordinator.WorkerLoad) list2.get(0)).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 6), new ConnectorTaskId("task", 9)));
        ((WorkerCoordinator.WorkerLoad) list2.get(1)).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 7), new ConnectorTaskId("task", 10)));
        ((WorkerCoordinator.WorkerLoad) list2.get(2)).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 8)));
        this.assignor.assignConnectors(list, newConnectors(6, 11));
        this.assignor.assignTasks(list, newTasks(6, 11));
        Assert.assertEquals(list2, list);
    }

    @Test
    public void testAssignConnectorsWhenImbalanced() {
        ArrayList<WorkerCoordinator.WorkerLoad> arrayList = new ArrayList();
        arrayList.add(workerLoad("worker0", 0, 2, 0, 2));
        arrayList.add(workerLoad("worker1", 2, 3, 2, 3));
        arrayList.add(workerLoad("worker2", 5, 4, 5, 4));
        arrayList.add(emptyWorkerLoad("worker3"));
        List<String> newConnectors = newConnectors(9, 24);
        List<ConnectorTaskId> newTasks = newTasks(9, 24);
        this.assignor.assignConnectors(arrayList, newConnectors);
        this.assignor.assignTasks(arrayList, newTasks);
        for (WorkerCoordinator.WorkerLoad workerLoad : arrayList) {
            Assert.assertEquals(6L, workerLoad.connectorsSize());
            Assert.assertEquals(6L, workerLoad.tasksSize());
        }
    }

    @Test
    public void testLostAssignmentHandlingWhenWorkerBounces() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = (WorkerCoordinator.WorkerLoad) hashMap.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(workerLoad.connectors(), workerLoad.tasks()).build();
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        hashMap.put("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build());
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.singleton("worker1"), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        this.time.sleep(this.rebalanceDelay);
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertTrue("Wrong assignment of lost connectors", ((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).connectors().containsAll(build.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", ((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).tasks().containsAll(build.tasks()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testLostAssignmentHandlingWhenWorkerLeavesPermanently() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = (WorkerCoordinator.WorkerLoad) hashMap.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(workerLoad.connectors(), workerLoad.tasks()).build();
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberAssignments.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.time.sleep(this.rebalanceDelay);
        WorkerCoordinator.ConnectorsAndTasks.Builder builder = new WorkerCoordinator.ConnectorsAndTasks.Builder();
        this.assignor.handleLostAssignments(build, builder, new ArrayList(hashMap.values()));
        Assert.assertTrue("Wrong assignment of lost connectors", builder.build().connectors().containsAll(build.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", builder.build().tasks().containsAll(build.tasks()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testLostAssignmentHandlingWithMoreThanOneCandidates() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = (WorkerCoordinator.WorkerLoad) hashMap.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(workerLoad.connectors(), workerLoad.tasks()).build();
        hashMap.put("worker3", new WorkerCoordinator.WorkerLoad.Builder("worker3").build());
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.singleton("worker3"), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        hashMap.put("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build());
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        HashSet hashSet = new HashSet();
        hashSet.addAll(Arrays.asList("worker3", "worker1"));
        Assert.assertEquals("Wrong set of workers for reassignments", hashSet, this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        this.time.sleep(this.rebalanceDelay);
        hashMap.put("worker1", workerLoad("worker1", 6, 2, 8, 4));
        hashMap.put("worker3", workerLoad("worker3", 8, 2, 12, 4));
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker3", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).connectors());
        arrayList.addAll(((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).connectors());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.addAll(((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker3", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).tasks());
        arrayList2.addAll(((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).tasks());
        Assert.assertTrue("Wrong assignment of lost connectors", arrayList.containsAll(build.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", arrayList2.containsAll(build.tasks()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testLostAssignmentHandlingWhenWorkerBouncesBackButFinallyLeaves() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = (WorkerCoordinator.WorkerLoad) hashMap.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(workerLoad.connectors(), workerLoad.tasks()).build();
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        hashMap.put("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build());
        this.assignor.handleLostAssignments(build, new WorkerCoordinator.ConnectorsAndTasks.Builder(), new ArrayList(hashMap.values()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.singleton("worker1"), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(hashMap.keySet());
        this.time.sleep(this.rebalanceDelay);
        hashMap.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks.Builder builder = new WorkerCoordinator.ConnectorsAndTasks.Builder();
        this.assignor.handleLostAssignments(build, builder, new ArrayList(hashMap.values()));
        Assert.assertTrue("Wrong assignment of lost connectors", builder.build().connectors().containsAll(build.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", builder.build().tasks().containsAll(build.tasks()));
        Assert.assertEquals("Wrong set of workers for reassignments", Collections.emptySet(), this.assignor.candidateWorkersForReassignment);
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(2);
        assertTaskAllocations(8);
        assertBalancedAndCompleteAllocation();
        addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4));
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 1);
        assertTaskAllocations(0, 4);
        performStandardRebalance();
        assertDelay(0);
        assertNoRevocations();
        assertConnectorAllocations(1, 1);
        assertTaskAllocations(4, 4);
        assertBalancedAndCompleteAllocation();
        performStandardRebalance();
        assertDelay(0);
        assertEmptyAssignment();
    }

    @Test
    public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
        this.time = new MockTime();
        initAssignor();
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1");
        assertConnectorAllocations(2);
        assertTaskAllocations(8);
        assertBalancedAndCompleteAllocation();
        removeConnector("connector1");
        addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4));
        performStandardRebalance();
        assertDelay(0);
        assertWorkers("worker1", "worker2");
        assertConnectorAllocations(0, 1);
        assertTaskAllocations(0, 2);
        performStandardRebalance();
        assertDelay(0);
        assertNoRevocations();
        assertConnectorAllocations(0, 1);
        assertTaskAllocations(2, 2);
        assertBalancedAndCompleteAllocation();
        this.time.sleep(this.assignor.delay);
        performStandardRebalance();
        assertDelay(0);
        assertEmptyAssignment();
    }

    @Test
    public void testLeaderStateUpdated() {
        this.connectors.clear();
        HashMap hashMap = new HashMap();
        hashMap.put("followMe", new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, ExtendedAssignment.empty()));
        WorkerCoordinator workerCoordinator = (WorkerCoordinator) Mockito.mock(WorkerCoordinator.class);
        Mockito.when(workerCoordinator.configSnapshot()).thenReturn(configState());
        this.assignor.performTaskAssignment("followMe", CONFIG_OFFSET, hashMap, workerCoordinator, (short) 2);
        ((WorkerCoordinator) Mockito.verify(workerCoordinator)).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.notNull());
    }

    @Test
    public void testProtocolV1() {
        this.connectors.clear();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId("followMe").setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, new ExtendedAssignment((short) 1, (short) 0, "followMe", "followMe:618", CONFIG_OFFSET, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), 0)), false).array()));
        WorkerCoordinator workerCoordinator = (WorkerCoordinator) Mockito.mock(WorkerCoordinator.class);
        Mockito.when(workerCoordinator.configSnapshot()).thenReturn(configState());
        this.assignor.performAssignment("followMe", ConnectProtocolCompatibility.COMPATIBLE.protocol(), arrayList, workerCoordinator).forEach((str, byteBuffer) -> {
            Assert.assertEquals("Incorrect protocol version in assignment for worker " + str, 1L, IncrementalCooperativeConnectProtocol.deserializeAssignment(byteBuffer).version());
        });
    }

    @Test
    public void testProtocolV2() {
        this.connectors.clear();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId("followMe").setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, new ExtendedAssignment((short) 2, (short) 0, "followMe", "followMe:618", CONFIG_OFFSET, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), 0)), true).array()));
        WorkerCoordinator workerCoordinator = (WorkerCoordinator) Mockito.mock(WorkerCoordinator.class);
        Mockito.when(workerCoordinator.configSnapshot()).thenReturn(configState());
        this.assignor.performAssignment("followMe", ConnectProtocolCompatibility.SESSIONED.protocol(), arrayList, workerCoordinator).forEach((str, byteBuffer) -> {
            Assert.assertEquals("Incorrect protocol version in assignment for worker " + str, 2L, IncrementalCooperativeConnectProtocol.deserializeAssignment(byteBuffer).version());
        });
    }

    private void performStandardRebalance() {
        performRebalance(false, false);
    }

    private void performFailedRebalance() {
        performRebalance(true, false);
    }

    private void performRebalanceWithMismatchedGeneration() {
        performRebalance(false, true);
    }

    private void performRebalance(boolean z, boolean z2) {
        this.generationId++;
        try {
            this.returnedAssignments = this.assignor.performTaskAssignment(configState(), z2 ? this.generationId - 2 : this.generationId - 1, this.generationId, (Map) this.memberAssignments.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(((WorkerCoordinator.ConnectorsAndTasks) entry.getValue()).connectors(), ((WorkerCoordinator.ConnectorsAndTasks) entry.getValue()).tasks()).build();
            })));
        } catch (RuntimeException e) {
            if (!z) {
                throw e;
            }
            RequestFuture.failure(e);
        }
        assertNoRedundantAssignments();
        if (z) {
            return;
        }
        applyAssignments();
    }

    private void addNewEmptyWorkers(String... strArr) {
        for (String str : strArr) {
            addNewWorker(str, Collections.emptyList(), Collections.emptyList());
        }
    }

    private void addNewWorker(String str, List<String> list, List<ConnectorTaskId> list2) {
        Assert.assertNull("Worker " + str + " already exists", this.memberAssignments.put(str, new WorkerCoordinator.ConnectorsAndTasks.Builder().with(list, list2).build()));
    }

    private void removeWorkers(String... strArr) {
        for (String str : strArr) {
            Assert.assertNotNull("Worker " + str + " does not exist", this.memberAssignments.remove(str));
        }
    }

    private static WorkerCoordinator.WorkerLoad emptyWorkerLoad(String str) {
        return new WorkerCoordinator.WorkerLoad.Builder(str).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static WorkerCoordinator.WorkerLoad workerLoad(String str, int i, int i2, int i3, int i4) {
        return new WorkerCoordinator.WorkerLoad.Builder(str).with(newConnectors(i, i + i2), newTasks(i3, i3 + i4)).build();
    }

    private static List<String> newConnectors(int i, int i2) {
        return (List) IntStream.range(i, i2).mapToObj(i3 -> {
            return WorkerConnectorTest.CONNECTOR + i3;
        }).collect(Collectors.toList());
    }

    private static List<ConnectorTaskId> newTasks(int i, int i2) {
        return newTasks("task", i, i2);
    }

    private static List<ConnectorTaskId> newTasks(String str, int i, int i2) {
        return (List) IntStream.range(i, i2).mapToObj(i3 -> {
            return new ConnectorTaskId(str, i3);
        }).collect(Collectors.toList());
    }

    private void addNewConnector(String str, int i) {
        Assert.assertNull("Connector " + str + " already exists", this.connectors.put(str, Integer.valueOf(i)));
    }

    private void removeConnector(String str) {
        Assert.assertNotNull("Connector " + str + " does not exist", this.connectors.remove(str));
    }

    private ClusterConfigState configState() {
        HashMap hashMap = new HashMap(this.connectors);
        return new ClusterConfigState(CONFIG_OFFSET, (SessionKey) null, hashMap, ConnectUtils.transformValues(hashMap, num -> {
            return Collections.emptyMap();
        }), ConnectUtils.transformValues(hashMap, num2 -> {
            return TargetState.STARTED;
        }), (Map) hashMap.entrySet().stream().flatMap(entry -> {
            return IntStream.range(0, ((Integer) entry.getValue()).intValue()).mapToObj(i -> {
                return new ConnectorTaskId((String) entry.getKey(), i);
            });
        }).collect(Collectors.toMap(Function.identity(), connectorTaskId -> {
            return Collections.emptyMap();
        })), Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
    }

    private void applyAssignments() {
        this.returnedAssignments.allWorkers().forEach(str -> {
            WorkerCoordinator.ConnectorsAndTasks computeIfAbsent = this.memberAssignments.computeIfAbsent(str, str -> {
                return new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
            });
            computeIfAbsent.connectors().removeAll(this.returnedAssignments.newlyRevokedConnectors(str));
            computeIfAbsent.connectors().addAll(this.returnedAssignments.newlyAssignedConnectors(str));
            computeIfAbsent.tasks().removeAll(this.returnedAssignments.newlyRevokedTasks(str));
            computeIfAbsent.tasks().addAll(this.returnedAssignments.newlyAssignedTasks(str));
            Assert.assertEquals("Complete connector assignment for worker " + str + " does not match expectations based on prior assignment and new revocations and assignments", new HashSet(computeIfAbsent.connectors()), new HashSet((Collection) this.returnedAssignments.allAssignedConnectors().get(str)));
            Assert.assertEquals("Complete task assignment for worker " + str + " does not match expectations based on prior assignment and new revocations and assignments", new HashSet(computeIfAbsent.tasks()), new HashSet((Collection) this.returnedAssignments.allAssignedTasks().get(str)));
        });
    }

    private void assertEmptyAssignment() {
        Assert.assertEquals("No connectors should have been newly assigned during this round", Collections.emptyList(), ConnectUtils.combineCollections(this.returnedAssignments.newlyAssignedConnectors().values()));
        Assert.assertEquals("No tasks should have been newly assigned during this round", Collections.emptyList(), ConnectUtils.combineCollections(this.returnedAssignments.newlyAssignedTasks().values()));
        Assert.assertEquals("No connectors should have been revoked during this round", Collections.emptyList(), ConnectUtils.combineCollections(this.returnedAssignments.newlyRevokedConnectors().values()));
        Assert.assertEquals("No tasks should have been revoked during this round", Collections.emptyList(), ConnectUtils.combineCollections(this.returnedAssignments.newlyRevokedTasks().values()));
    }

    private void assertWorkers(String... strArr) {
        Assert.assertEquals("Wrong set of workers", new HashSet(Arrays.asList(strArr)), this.returnedAssignments.allWorkers());
    }

    private void assertConnectorAllocations(int... iArr) {
        assertAllocations("connectors", (v0) -> {
            return v0.connectors();
        }, iArr);
    }

    private void assertTaskAllocations(int... iArr) {
        assertAllocations("tasks", (v0) -> {
            return v0.tasks();
        }, iArr);
    }

    private void assertAllocations(String str, Function<WorkerCoordinator.ConnectorsAndTasks, ? extends Collection<?>> function, int... iArr) {
        Assert.assertEquals("Allocation of assigned " + str + " across cluster does not match expected counts", (List) IntStream.of(iArr).boxed().sorted().collect(Collectors.toList()), allocations(function));
    }

    private List<Integer> allocations(Function<WorkerCoordinator.ConnectorsAndTasks, ? extends Collection<?>> function) {
        return (List) this.memberAssignments.values().stream().map(function).map((v0) -> {
            return v0.size();
        }).sorted().collect(Collectors.toList());
    }

    private void assertNoRevocations() {
        this.returnedAssignments.newlyRevokedConnectors().forEach((str, collection) -> {
            Assert.assertEquals("Expected no revocations to take place during this round, but connector revocations were issued for worker " + str, Collections.emptySet(), new HashSet(collection));
        });
        this.returnedAssignments.newlyRevokedTasks().forEach((str2, collection2) -> {
            Assert.assertEquals("Expected no revocations to take place during this round, but task revocations were issued for worker " + str2, Collections.emptySet(), new HashSet(collection2));
        });
    }

    private void assertDelay(int i) {
        Assert.assertEquals("Wrong rebalance delay", i, this.assignor.delay);
    }

    private void assertNoRedundantAssignments() {
        List combineCollections = ConnectUtils.combineCollections(this.memberAssignments.values(), (v0) -> {
            return v0.connectors();
        });
        List combineCollections2 = ConnectUtils.combineCollections(this.returnedAssignments.newlyAssignedConnectors().values());
        List combineCollections3 = ConnectUtils.combineCollections(this.memberAssignments.values(), (v0) -> {
            return v0.tasks();
        });
        List combineCollections4 = ConnectUtils.combineCollections(this.returnedAssignments.newlyAssignedTasks().values());
        assertNoDuplicates(combineCollections2, "Connectors should be unique in assignments but duplicates were found; the set of newly-assigned connectors is " + combineCollections2);
        assertNoDuplicates(combineCollections4, "Tasks should be unique in assignments but duplicates were found; the set of newly-assigned tasks is " + combineCollections4);
        combineCollections.retainAll(combineCollections2);
        Assert.assertEquals("Found connectors in new assignment that already exist in current assignment", Collections.emptyList(), combineCollections);
        combineCollections3.retainAll(combineCollections4);
        Assert.assertEquals("Found tasks in new assignment that already exist in current assignment", Collections.emptyList(), combineCollections);
    }

    private void assertBalancedAndCompleteAllocation() {
        assertBalancedAllocation();
        assertCompleteAllocation();
    }

    private void assertBalancedAllocation() {
        List<Integer> allocations = allocations((v0) -> {
            return v0.connectors();
        });
        List<Integer> allocations2 = allocations((v0) -> {
            return v0.tasks();
        });
        int intValue = allocations.get(0).intValue();
        int intValue2 = allocations.get(allocations.size() - 1).intValue();
        int intValue3 = allocations2.get(0).intValue();
        int intValue4 = allocations2.get(allocations2.size() - 1).intValue();
        Assert.assertTrue("Assignments are imbalanced. The spread of connectors across each worker is: " + allocations, intValue2 - intValue <= 1);
        Assert.assertTrue("Assignments are imbalanced. The spread of tasks across each worker is: " + allocations2, intValue4 - intValue3 <= 1);
    }

    private void assertCompleteAllocation() {
        Assert.assertEquals("The set of connectors assigned across the cluster does not match the set of connectors in the config topic", this.connectors.keySet(), new HashSet(ConnectUtils.combineCollections(this.memberAssignments.values(), (v0) -> {
            return v0.connectors();
        })));
        Map map = (Map) ConnectUtils.combineCollections(this.memberAssignments.values(), (v0) -> {
            return v0.tasks();
        }).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.connector();
        }, Collectors.toList()));
        this.connectors.forEach((str, num) -> {
            Assert.assertEquals("The set of tasks assigned across the cluster for connector " + str + " does not match the set of tasks in the config topic", (Set) IntStream.range(0, num.intValue()).mapToObj(i -> {
                return new ConnectorTaskId(str, i);
            }).collect(Collectors.toSet()), new HashSet((Collection) map.get(str)));
        });
    }

    private static <T> void assertNoDuplicates(List<T> list, String str) {
        Assert.assertEquals(str, new HashSet(list).size(), list.size());
    }
}
