package org.apache.kafka.connect.runtime.distributed;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConnectorTest;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.class */
public class IncrementalCooperativeAssignorTest {

    @Rule
    public MockitoRule rule = MockitoJUnit.rule();

    @Mock
    private WorkerCoordinator coordinator;

    @Captor
    ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
    private ClusterConfigState configState;
    private Map<String, ExtendedWorkerState> memberConfigs;
    private Map<String, ExtendedWorkerState> expectedMemberConfigs;
    private long offset;
    private String leader;
    private String leaderUrl;
    private Time time;
    private int rebalanceDelay;
    private IncrementalCooperativeAssignor assignor;
    private int rebalanceNum;
    Map<String, ExtendedAssignment> assignments;
    Map<String, ExtendedAssignment> returnedAssignments;

    @Before
    public void setup() {
        this.leader = "worker1";
        this.leaderUrl = expectedLeaderUrl(this.leader);
        this.offset = 10L;
        this.configState = clusterConfigState(this.offset, 2, 4);
        this.memberConfigs = memberConfigs(this.leader, this.offset, 1, 1);
        this.time = Time.SYSTEM;
        this.rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
        this.assignments = new HashMap();
        initAssignor();
    }

    @After
    public void teardown() {
        Mockito.verifyNoMoreInteractions(new Object[]{this.coordinator});
    }

    public void initAssignor() {
        this.assignor = (IncrementalCooperativeAssignor) Mockito.spy(new IncrementalCooperativeAssignor(new LogContext(), this.time, this.rebalanceDelay));
        this.assignor.previousGenerationId = 1000;
    }

    @Test
    public void testTaskAssignmentWhenWorkerJoins() {
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(1, 4, 0, 0, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenWorkerLeavesPermanently() {
        this.time = new MockTime();
        initAssignor();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.assignments.remove("worker2");
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(this.rebalanceDelay, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1");
        this.time.sleep(this.rebalanceDelay / 2);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(this.rebalanceDelay / 2, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1");
        this.time.sleep((this.rebalanceDelay / 2) + 1);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(1, 4, 0, 0, "worker1");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenWorkerBounces() {
        this.time = new MockTime();
        initAssignor();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.assignments.remove("worker2");
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(this.rebalanceDelay, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1");
        this.time.sleep(this.rebalanceDelay / 2);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(this.rebalanceDelay / 2, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1");
        this.time.sleep(this.rebalanceDelay / 4);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(this.rebalanceDelay / 4, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
        this.time.sleep(this.rebalanceDelay / 4);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(1, 4, 0, 0, "worker1", "worker2");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenLeaderLeavesPermanently() {
        this.time = new MockTime();
        initAssignor();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        this.memberConfigs.put("worker3", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2", "worker3");
        applyAssignments(this.returnedAssignments);
        this.assignments.remove("worker1");
        this.leader = "worker2";
        this.leaderUrl = expectedLeaderUrl(this.leader);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        initAssignor();
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(1, 3, 0, 0, "worker2", "worker3");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker2", "worker3");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenLeaderBounces() {
        this.time = new MockTime();
        initAssignor();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        this.memberConfigs.put("worker3", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2", "worker3");
        applyAssignments(this.returnedAssignments);
        this.assignments.remove("worker1");
        this.leader = "worker2";
        this.leaderUrl = expectedLeaderUrl(this.leader);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        initAssignor();
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(1, 3, 0, 0, "worker2", "worker3");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.memberConfigs.put("worker1", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 2, 0, 0, "worker1", "worker2", "worker3");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenFirstAssignmentAttemptFails() {
        this.time = new MockTime();
        initAssignor();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doThrow(new Throwable[]{new RuntimeException("Unable to send computed assignment with SyncGroupRequest")}).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        try {
            expectGeneration();
            this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        } catch (RuntimeException e) {
            RequestFuture.failure(e);
        }
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertDelay(0, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
        this.time = new MockTime();
        initAssignor();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doThrow(new Throwable[]{new RuntimeException("Unable to send computed assignment with SyncGroupRequest")}).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.memberConfigs.put("worker3", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        try {
            expectGeneration();
            this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        } catch (RuntimeException e) {
            RequestFuture.failure(e);
        }
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertDelay(0, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertDelay(0, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssignor() {
        this.time = new MockTime();
        initAssignor();
        expectGeneration();
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.memberConfigs.put("worker3", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        Mockito.when(Integer.valueOf(this.coordinator.generationId())).thenReturn(Integer.valueOf(this.assignor.previousGenerationId + 1)).thenReturn(Integer.valueOf(this.assignor.previousGenerationId + 1));
        Mockito.when(Integer.valueOf(this.coordinator.lastCompletedGenerationId())).thenReturn(Integer.valueOf(this.assignor.previousGenerationId - 1));
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertDelay(0, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
        Mockito.when(Integer.valueOf(this.coordinator.lastCompletedGenerationId())).thenReturn(Integer.valueOf(this.assignor.previousGenerationId - 1));
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertDelay(0, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testTaskAssignmentWhenConnectorsAreDeleted() {
        this.configState = clusterConfigState(this.offset, 3, 4);
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(3, 12, 0, 0, "worker1", "worker2");
        this.configState = clusterConfigState(this.offset + 1, 2, 4);
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        expectGeneration();
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testAssignConnectorsWhenBalanced() {
        int i = 2;
        List list = (List) IntStream.range(0, 3).mapToObj(i2 -> {
            return workerLoad("worker" + i2, i2 * i, i, i2 * i, i);
        }).collect(Collectors.toList());
        List list2 = (List) list.stream().map(workerLoad -> {
            return new WorkerCoordinator.WorkerLoad.Builder(workerLoad.worker()).withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        }).collect(Collectors.toList());
        ((WorkerCoordinator.WorkerLoad) list2.get(0)).connectors().addAll(Arrays.asList("connector6", "connector9"));
        ((WorkerCoordinator.WorkerLoad) list2.get(1)).connectors().addAll(Arrays.asList("connector7", "connector10"));
        ((WorkerCoordinator.WorkerLoad) list2.get(2)).connectors().addAll(Arrays.asList("connector8"));
        this.assignor.assignConnectors(list, newConnectors(6, 11));
        Assert.assertEquals(list2, list);
    }

    @Test
    public void testAssignTasksWhenBalanced() {
        int i = 2;
        List list = (List) IntStream.range(0, 3).mapToObj(i2 -> {
            return workerLoad("worker" + i2, i2 * i, i, i2 * i, i);
        }).collect(Collectors.toList());
        List list2 = (List) list.stream().map(workerLoad -> {
            return new WorkerCoordinator.WorkerLoad.Builder(workerLoad.worker()).withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        }).collect(Collectors.toList());
        ((WorkerCoordinator.WorkerLoad) list2.get(0)).connectors().addAll(Arrays.asList("connector6", "connector9"));
        ((WorkerCoordinator.WorkerLoad) list2.get(1)).connectors().addAll(Arrays.asList("connector7", "connector10"));
        ((WorkerCoordinator.WorkerLoad) list2.get(2)).connectors().addAll(Arrays.asList("connector8"));
        ((WorkerCoordinator.WorkerLoad) list2.get(0)).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 6), new ConnectorTaskId("task", 9)));
        ((WorkerCoordinator.WorkerLoad) list2.get(1)).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 7), new ConnectorTaskId("task", 10)));
        ((WorkerCoordinator.WorkerLoad) list2.get(2)).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 8)));
        this.assignor.assignConnectors(list, newConnectors(6, 11));
        this.assignor.assignTasks(list, newTasks(6, 11));
        Assert.assertEquals(list2, list);
    }

    @Test
    public void testAssignConnectorsWhenImbalanced() {
        ArrayList<WorkerCoordinator.WorkerLoad> arrayList = new ArrayList();
        arrayList.add(workerLoad("worker0", 0, 2, 0, 2));
        arrayList.add(workerLoad("worker1", 2, 3, 2, 3));
        arrayList.add(workerLoad("worker2", 5, 4, 5, 4));
        arrayList.add(emptyWorkerLoad("worker3"));
        List<String> newConnectors = newConnectors(9, 24);
        List<ConnectorTaskId> newTasks = newTasks(9, 24);
        this.assignor.assignConnectors(arrayList, newConnectors);
        this.assignor.assignTasks(arrayList, newTasks);
        for (WorkerCoordinator.WorkerLoad workerLoad : arrayList) {
            Assert.assertEquals(6L, workerLoad.connectorsSize());
            Assert.assertEquals(6L, workerLoad.tasksSize());
        }
    }

    @Test
    public void testLostAssignmentHandlingWhenWorkerBounces() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.memberConfigs = memberConfigs(this.leader, this.offset, 0, 2);
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = workerLoad("worker1", 2, 2, 4, 4);
        this.memberConfigs.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build2 = new WorkerCoordinator.ConnectorsAndTasks.Builder().withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        hashMap.put("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build());
        this.memberConfigs.put("worker1", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.singleton("worker1"), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay);
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        Assert.assertTrue("Wrong assignment of lost connectors", ((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).connectors().containsAll(build2.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", ((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).tasks().containsAll(build2.tasks()));
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testLostAssignmentHandlingWhenWorkerLeavesPermanently() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.memberConfigs = memberConfigs(this.leader, this.offset, 0, 2);
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = workerLoad("worker1", 2, 2, 4, 4);
        this.memberConfigs.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build2 = new WorkerCoordinator.ConnectorsAndTasks.Builder().withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.time.sleep(this.rebalanceDelay);
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        Assert.assertTrue("Wrong assignment of lost connectors", build.connectors().containsAll(build2.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", build.tasks().containsAll(build2.tasks()));
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testLostAssignmentHandlingWithMoreThanOneCandidates() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.memberConfigs = memberConfigs(this.leader, this.offset, 0, 2);
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = workerLoad("worker1", 2, 2, 4, 4);
        this.memberConfigs.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build2 = new WorkerCoordinator.ConnectorsAndTasks.Builder().withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        hashMap.put("worker3", new WorkerCoordinator.WorkerLoad.Builder("worker3").build());
        this.memberConfigs.put("worker3", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.singleton("worker3"), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        hashMap.put("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build());
        this.memberConfigs.put("worker1", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        HashSet hashSet = new HashSet();
        hashSet.addAll(Arrays.asList("worker3", "worker1"));
        MatcherAssert.assertThat("Wrong set of workers for reassignments", hashSet, CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay);
        hashMap.put("worker1", workerLoad("worker1", 6, 2, 8, 4));
        hashMap.put("worker3", workerLoad("worker3", 8, 2, 12, 4));
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        Assert.assertTrue("Wrong assignment of lost connectors", ((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker3", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).connectors().containsAll(build2.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", ((WorkerCoordinator.WorkerLoad) hashMap.getOrDefault("worker3", new WorkerCoordinator.WorkerLoad.Builder("worker1").build())).tasks().containsAll(build2.tasks()));
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testLostAssignmentHandlingWhenWorkerBouncesBackButFinallyLeaves() {
        this.time = new MockTime();
        initAssignor();
        Assert.assertTrue(this.assignor.candidateWorkersForReassignment.isEmpty());
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        HashMap hashMap = new HashMap();
        hashMap.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
        hashMap.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
        hashMap.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
        this.memberConfigs = memberConfigs(this.leader, this.offset, 0, 2);
        WorkerCoordinator.ConnectorsAndTasks build = new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
        this.assignor.handleLostAssignments(new WorkerCoordinator.ConnectorsAndTasks.Builder().build(), build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        WorkerCoordinator.WorkerLoad workerLoad = workerLoad("worker1", 2, 2, 4, 4);
        this.memberConfigs.remove("worker1");
        WorkerCoordinator.ConnectorsAndTasks build2 = new WorkerCoordinator.ConnectorsAndTasks.Builder().withCopies(workerLoad.connectors(), workerLoad.tasks()).build();
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay / 2);
        this.rebalanceDelay /= 2;
        hashMap.put("worker1", new WorkerCoordinator.WorkerLoad.Builder("worker1").build());
        this.memberConfigs.put("worker1", new ExtendedWorkerState(this.leaderUrl, this.offset, (ExtendedAssignment) null));
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.singleton("worker1"), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(this.time.milliseconds() + this.rebalanceDelay, this.assignor.scheduledRebalance);
        Assert.assertEquals(this.rebalanceDelay, this.assignor.delay);
        this.assignor.previousMembers = new HashSet(this.memberConfigs.keySet());
        this.time.sleep(this.rebalanceDelay);
        hashMap.remove("worker1");
        this.memberConfigs.remove("worker1");
        this.assignor.handleLostAssignments(build2, build, new ArrayList(hashMap.values()), this.memberConfigs);
        Assert.assertTrue("Wrong assignment of lost connectors", build.connectors().containsAll(build2.connectors()));
        Assert.assertTrue("Wrong assignment of lost tasks", build.tasks().containsAll(build2.tasks()));
        MatcherAssert.assertThat("Wrong set of workers for reassignments", Collections.emptySet(), CoreMatchers.is(this.assignor.candidateWorkersForReassignment));
        Assert.assertEquals(0L, this.assignor.scheduledRebalance);
        Assert.assertEquals(0L, this.assignor.delay);
    }

    @Test
    public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        ExtendedAssignment newExpandableAssignment = newExpandableAssignment();
        newExpandableAssignment.connectors().addAll(newConnectors(1, 2));
        newExpandableAssignment.tasks().addAll(newTasks("connector1", 0, 4));
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, newExpandableAssignment));
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(1, 4, 0, 2, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    @Test
    public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        ((IncrementalCooperativeAssignor) Mockito.doReturn(Collections.EMPTY_MAP).when(this.assignor)).serializeAssignments((Map) this.assignmentsCapture.capture());
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(2, 8, 0, 0, "worker1");
        this.configState = clusterConfigState(this.offset, 2, 1, 4);
        Mockito.when(this.coordinator.configSnapshot()).thenReturn(this.configState);
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        ExtendedAssignment newExpandableAssignment = newExpandableAssignment();
        newExpandableAssignment.connectors().addAll(newConnectors(1, 2));
        newExpandableAssignment.tasks().addAll(newTasks("connector1", 0, 4));
        this.memberConfigs.put("worker2", new ExtendedWorkerState(this.leaderUrl, this.offset, newExpandableAssignment));
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 2, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
        applyAssignments(this.returnedAssignments);
        this.memberConfigs = memberConfigs(this.leader, this.offset, this.assignments);
        this.assignor.performTaskAssignment(this.leader, this.offset, this.memberConfigs, this.coordinator);
        this.rebalanceNum++;
        this.returnedAssignments = (Map) this.assignmentsCapture.getValue();
        assertDelay(0, this.returnedAssignments);
        this.expectedMemberConfigs = memberConfigs(this.leader, this.offset, this.returnedAssignments);
        assertNoReassignments(this.memberConfigs, this.expectedMemberConfigs);
        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).configSnapshot();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).leaderState((WorkerCoordinator.LeaderState) ArgumentMatchers.any());
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(2 * this.rebalanceNum))).generationId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).memberId();
        ((WorkerCoordinator) Mockito.verify(this.coordinator, Mockito.times(this.rebalanceNum))).lastCompletedGenerationId();
    }

    private WorkerCoordinator.WorkerLoad emptyWorkerLoad(String str) {
        return new WorkerCoordinator.WorkerLoad.Builder(str).build();
    }

    private WorkerCoordinator.WorkerLoad workerLoad(String str, int i, int i2, int i3, int i4) {
        return new WorkerCoordinator.WorkerLoad.Builder(str).with(newConnectors(i, i + i2), newTasks(i3, i3 + i4)).build();
    }

    private static List<String> newConnectors(int i, int i2) {
        return (List) IntStream.range(i, i2).mapToObj(i3 -> {
            return WorkerConnectorTest.CONNECTOR + i3;
        }).collect(Collectors.toList());
    }

    private static List<ConnectorTaskId> newTasks(int i, int i2) {
        return newTasks("task", i, i2);
    }

    private static List<ConnectorTaskId> newTasks(String str, int i, int i2) {
        return (List) IntStream.range(i, i2).mapToObj(i3 -> {
            return new ConnectorTaskId(str, i3);
        }).collect(Collectors.toList());
    }

    private static ClusterConfigState clusterConfigState(long j, int i, int i2) {
        return clusterConfigState(j, 1, i, i2);
    }

    private static ClusterConfigState clusterConfigState(long j, int i, int i2, int i3) {
        int i4 = (i + i2) - 1;
        return new ClusterConfigState(j, connectorTaskCounts(i, i4, i3), connectorConfigs(i, i4), connectorTargetStates(i, i4, TargetState.STARTED), taskConfigs(0, i2, i2 * i3), Collections.emptySet());
    }

    private static Map<String, ExtendedWorkerState> memberConfigs(String str, long j, Map<String, ExtendedAssignment> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new ExtendedWorkerState(expectedLeaderUrl(str), j, (ExtendedAssignment) entry.getValue());
        }));
    }

    private static Map<String, ExtendedWorkerState> memberConfigs(String str, long j, int i, int i2) {
        return (Map) IntStream.range(i, i2 + 1).mapToObj(i3 -> {
            return new AbstractMap.SimpleEntry("worker" + i3, new ExtendedWorkerState(expectedLeaderUrl(str), j, (ExtendedAssignment) null));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static Map<String, Integer> connectorTaskCounts(int i, int i2, int i3) {
        return (Map) IntStream.range(i, i2 + 1).mapToObj(i4 -> {
            return new AbstractMap.SimpleEntry(WorkerConnectorTest.CONNECTOR + i4, Integer.valueOf(i3));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static Map<String, Map<String, String>> connectorConfigs(int i, int i2) {
        return (Map) IntStream.range(i, i2 + 1).mapToObj(i3 -> {
            return new AbstractMap.SimpleEntry(WorkerConnectorTest.CONNECTOR + i3, new HashMap());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static Map<String, TargetState> connectorTargetStates(int i, int i2, TargetState targetState) {
        return (Map) IntStream.range(i, i2 + 1).mapToObj(i3 -> {
            return new AbstractMap.SimpleEntry(WorkerConnectorTest.CONNECTOR + i3, targetState);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static Map<ConnectorTaskId, Map<String, String>> taskConfigs(int i, int i2, int i3) {
        return (Map) IntStream.range(i, i3 + 1).mapToObj(i4 -> {
            return new AbstractMap.SimpleEntry(new ConnectorTaskId(WorkerConnectorTest.CONNECTOR + (i4 / i2) + 1, i4), new HashMap());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private void applyAssignments(Map<String, ExtendedAssignment> map) {
        map.forEach((str, extendedAssignment) -> {
            this.assignments.computeIfAbsent(str, str -> {
                return newExpandableAssignment();
            }).connectors().removeAll(extendedAssignment.revokedConnectors());
            this.assignments.computeIfAbsent(str, str2 -> {
                return newExpandableAssignment();
            }).connectors().addAll(extendedAssignment.connectors());
            this.assignments.computeIfAbsent(str, str3 -> {
                return newExpandableAssignment();
            }).tasks().removeAll(extendedAssignment.revokedTasks());
            this.assignments.computeIfAbsent(str, str4 -> {
                return newExpandableAssignment();
            }).tasks().addAll(extendedAssignment.tasks());
        });
    }

    private ExtendedAssignment newExpandableAssignment() {
        return new ExtendedAssignment((short) 1, (short) 0, this.leader, this.leaderUrl, this.offset, new ArrayList(), new ArrayList(), new ArrayList(), new ArrayList(), 0);
    }

    private static String expectedLeaderUrl(String str) {
        return "http://" + str + ":8083";
    }

    private void assertAssignment(int i, int i2, int i3, int i4, String... strArr) {
        assertAssignment(this.leader, i, i2, i3, i4, strArr);
    }

    private void assertAssignment(String str, int i, int i2, int i3, int i4, String... strArr) {
        MatcherAssert.assertThat("Wrong number of workers", Integer.valueOf(this.expectedMemberConfigs.keySet().size()), CoreMatchers.is(Integer.valueOf(strArr.length)));
        MatcherAssert.assertThat("Wrong set of workers", new ArrayList(this.expectedMemberConfigs.keySet()), CoreMatchers.hasItems(strArr));
        MatcherAssert.assertThat("Wrong number of assigned connectors", this.expectedMemberConfigs.values().stream().map(extendedWorkerState -> {
            return Integer.valueOf(extendedWorkerState.assignment().connectors().size());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }), CoreMatchers.is(Integer.valueOf(i)));
        MatcherAssert.assertThat("Wrong number of assigned tasks", this.expectedMemberConfigs.values().stream().map(extendedWorkerState2 -> {
            return Integer.valueOf(extendedWorkerState2.assignment().tasks().size());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }), CoreMatchers.is(Integer.valueOf(i2)));
        MatcherAssert.assertThat("Wrong number of revoked connectors", this.expectedMemberConfigs.values().stream().map(extendedWorkerState3 -> {
            return Integer.valueOf(extendedWorkerState3.assignment().revokedConnectors().size());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }), CoreMatchers.is(Integer.valueOf(i3)));
        MatcherAssert.assertThat("Wrong number of revoked tasks", this.expectedMemberConfigs.values().stream().map(extendedWorkerState4 -> {
            return Integer.valueOf(extendedWorkerState4.assignment().revokedTasks().size());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }), CoreMatchers.is(Integer.valueOf(i4)));
        MatcherAssert.assertThat("Wrong leader in assignments", this.expectedMemberConfigs.values().stream().map(extendedWorkerState5 -> {
            return extendedWorkerState5.assignment().leader();
        }).distinct().collect(Collectors.joining(", ")), CoreMatchers.is(str));
        MatcherAssert.assertThat("Wrong leaderUrl in assignments", this.expectedMemberConfigs.values().stream().map(extendedWorkerState6 -> {
            return extendedWorkerState6.assignment().leaderUrl();
        }).distinct().collect(Collectors.joining(", ")), CoreMatchers.is(expectedLeaderUrl(str)));
    }

    private void assertDelay(int i, Map<String, ExtendedAssignment> map) {
        map.values().stream().forEach(extendedAssignment -> {
            Assert.assertEquals("Wrong rebalance delay in " + extendedAssignment, i, extendedAssignment.delay());
        });
    }

    private void assertNoReassignments(Map<String, ExtendedWorkerState> map, Map<String, ExtendedWorkerState> map2) {
        assertNoDuplicateInAssignment(map);
        assertNoDuplicateInAssignment(map2);
        List list = (List) map.values().stream().flatMap(extendedWorkerState -> {
            return extendedWorkerState.assignment().connectors().stream();
        }).collect(Collectors.toList());
        List list2 = (List) map2.values().stream().flatMap(extendedWorkerState2 -> {
            return extendedWorkerState2.assignment().connectors().stream();
        }).collect(Collectors.toList());
        List list3 = (List) map.values().stream().flatMap(extendedWorkerState3 -> {
            return extendedWorkerState3.assignment().tasks().stream();
        }).collect(Collectors.toList());
        List list4 = (List) map2.values().stream().flatMap(extendedWorkerState4 -> {
            return extendedWorkerState4.assignment().tasks().stream();
        }).collect(Collectors.toList());
        list.retainAll(list2);
        MatcherAssert.assertThat("Found connectors in new assignment that already exist in current assignment", Collections.emptyList(), CoreMatchers.is(list));
        list3.retainAll(list4);
        MatcherAssert.assertThat("Found tasks in new assignment that already exist in current assignment", Collections.emptyList(), CoreMatchers.is(list));
    }

    private void assertNoDuplicateInAssignment(Map<String, ExtendedWorkerState> map) {
        List list = (List) map.values().stream().flatMap(extendedWorkerState -> {
            return extendedWorkerState.assignment().connectors().stream();
        }).collect(Collectors.toList());
        list.removeAll(new HashSet(list));
        MatcherAssert.assertThat("Connectors should be unique in assignments but duplicates where found", Collections.emptyList(), CoreMatchers.is(list));
        List list2 = (List) map.values().stream().flatMap(extendedWorkerState2 -> {
            return extendedWorkerState2.assignment().tasks().stream();
        }).collect(Collectors.toList());
        list2.removeAll(new HashSet(list2));
        MatcherAssert.assertThat("Tasks should be unique in assignments but duplicates where found", Collections.emptyList(), CoreMatchers.is(list2));
    }

    private void expectGeneration() {
        Mockito.when(Integer.valueOf(this.coordinator.generationId())).thenReturn(Integer.valueOf(this.assignor.previousGenerationId + 1)).thenReturn(Integer.valueOf(this.assignor.previousGenerationId + 1));
        Mockito.when(Integer.valueOf(this.coordinator.lastCompletedGenerationId())).thenReturn(Integer.valueOf(this.assignor.previousGenerationId));
    }
}
