package org.apache.kafka.connect.runtime.distributed;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.WorkerTestUtils;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.class */
public class WorkerCoordinatorIncrementalTest {
    private MockTime time;
    private MockClient client;
    private Node node;
    private Metadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;

    @Mock
    private KafkaConfigBackingStore configStorage;
    private GroupRebalanceConfig rebalanceConfig;
    private WorkerCoordinator coordinator;
    private String leaderId;
    private String memberId;
    private String anotherMemberId;
    private String leaderUrl;
    private String memberUrl;
    private String anotherMemberUrl;
    private int generationId;
    private long offset;
    private int configStorageCalls;
    private ClusterConfigState configState1;
    private ClusterConfigState configState2;
    private ClusterConfigState configStateSingleTaskConnectors;

    @Parameterized.Parameter
    public ConnectProtocolCompatibility compatibility;

    @Parameterized.Parameter(1)
    public int expectedMetadataSize;

    @Rule
    public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
    private String connectorId1 = "connector1";
    private String connectorId2 = "connector2";
    private String connectorId3 = "connector3";
    private ConnectorTaskId taskId1x0 = new ConnectorTaskId(this.connectorId1, 0);
    private ConnectorTaskId taskId1x1 = new ConnectorTaskId(this.connectorId1, 1);
    private ConnectorTaskId taskId2x0 = new ConnectorTaskId(this.connectorId2, 0);
    private ConnectorTaskId taskId3x0 = new ConnectorTaskId(this.connectorId3, 0);
    private String groupId = "test-group";
    private int sessionTimeoutMs = 10;
    private int rebalanceTimeoutMs = 60;
    private int heartbeatIntervalMs = 2;
    private long retryBackoffMs = 100;
    private long retryBackoffMaxMs = 1000;
    private int requestTimeoutMs = 1000;
    private int rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest$MockRebalanceListener.class */
    private static class MockRebalanceListener implements WorkerRebalanceListener {
        public ExtendedAssignment assignment;
        public String revokedLeader;
        public Collection<String> revokedConnectors;
        public Collection<ConnectorTaskId> revokedTasks;
        public int revokedCount;
        public int assignedCount;

        private MockRebalanceListener() {
            this.assignment = null;
            this.revokedConnectors = Collections.emptyList();
            this.revokedTasks = Collections.emptyList();
            this.revokedCount = 0;
            this.assignedCount = 0;
        }

        public void onAssigned(ExtendedAssignment extendedAssignment, int i) {
            this.assignment = extendedAssignment;
            this.assignedCount++;
        }

        public void onRevoked(String str, Collection<String> collection, Collection<ConnectorTaskId> collection2) {
            if (collection.isEmpty() && collection2.isEmpty()) {
                return;
            }
            this.revokedLeader = str;
            this.revokedConnectors = collection;
            this.revokedTasks = collection2;
            this.revokedCount++;
        }
    }

    @Parameterized.Parameters
    public static Iterable<?> mode() {
        return Arrays.asList(new Object[]{ConnectProtocolCompatibility.COMPATIBLE, 2}, new Object[]{ConnectProtocolCompatibility.SESSIONED, 3});
    }

    @Before
    public void setup() {
        LogContext logContext = new LogContext();
        this.time = new MockTime();
        this.metadata = new Metadata(0L, 0L, Long.MAX_VALUE, logContext, new ClusterResourceListeners());
        this.client = new MockClient(this.time, this.metadata);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap(MonitorableSourceConnector.TOPIC_CONFIG, 1)));
        this.node = (Node) this.metadata.fetch().nodes().get(0);
        this.consumerClient = new ConsumerNetworkClient(logContext, this.client, this.metadata, this.time, this.retryBackoffMs, this.requestTimeoutMs, this.heartbeatIntervalMs);
        this.metrics = new Metrics(this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.leaderId = "worker1";
        this.memberId = "worker2";
        this.anotherMemberId = "worker3";
        this.leaderUrl = expectedUrl(this.leaderId);
        this.memberUrl = expectedUrl(this.memberId);
        this.anotherMemberUrl = expectedUrl(this.anotherMemberId);
        this.generationId = 3;
        this.offset = 10L;
        this.configStorageCalls = 0;
        this.rebalanceConfig = new GroupRebalanceConfig(this.sessionTimeoutMs, this.rebalanceTimeoutMs, this.heartbeatIntervalMs, this.groupId, Optional.empty(), this.retryBackoffMs, this.retryBackoffMaxMs, true);
        this.coordinator = new WorkerCoordinator(this.rebalanceConfig, logContext, this.consumerClient, this.metrics, "worker" + this.groupId, this.time, expectedUrl(this.leaderId), this.configStorage, this.rebalanceListener, this.compatibility, this.rebalanceDelay);
        this.configState1 = WorkerTestUtils.clusterConfigState(this.offset, 2, 4);
    }

    @After
    public void teardown() {
        this.metrics.close();
        Mockito.verifyNoMoreInteractions(new Object[]{this.configStorage});
    }

    private static String expectedUrl(String str) {
        return "http://" + str + ":8083";
    }

    @Test
    public void testMetadata() {
        Mockito.when(this.configStorage.snapshot()).thenReturn(this.configState1);
        JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata = this.coordinator.metadata();
        Assert.assertEquals(this.expectedMetadataSize, metadata.size());
        Iterator it = metadata.iterator();
        Assert.assertTrue(it.hasNext());
        JoinGroupRequestData.JoinGroupRequestProtocol joinGroupRequestProtocol = (JoinGroupRequestData.JoinGroupRequestProtocol) it.next();
        Assert.assertEquals(this.compatibility.protocol(), joinGroupRequestProtocol.name());
        Assert.assertEquals(this.offset, IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(joinGroupRequestProtocol.metadata())).offset());
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(1))).snapshot();
    }

    @Test
    public void testMetadataWithExistingAssignment() {
        Mockito.when(this.configStorage.snapshot()).thenReturn(this.configState1);
        this.coordinator.onJoinComplete(this.generationId, this.memberId, this.compatibility.protocol(), IncrementalCooperativeConnectProtocol.serializeAssignment(new ExtendedAssignment((short) 1, (short) 0, this.leaderId, this.leaderUrl, this.configState1.offset(), Collections.singletonList(this.connectorId1), Arrays.asList(this.taskId1x0, this.taskId2x0), Collections.emptyList(), Collections.emptyList(), 0), false));
        JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata = this.coordinator.metadata();
        Assert.assertEquals(this.expectedMetadataSize, metadata.size());
        Iterator it = metadata.iterator();
        Assert.assertTrue(it.hasNext());
        JoinGroupRequestData.JoinGroupRequestProtocol joinGroupRequestProtocol = (JoinGroupRequestData.JoinGroupRequestProtocol) it.next();
        Assert.assertEquals(this.compatibility.protocol(), joinGroupRequestProtocol.name());
        ExtendedWorkerState deserializeMetadata = IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(joinGroupRequestProtocol.metadata()));
        Assert.assertEquals(this.offset, deserializeMetadata.offset());
        Assert.assertNotEquals(ExtendedAssignment.empty(), deserializeMetadata.assignment());
        Assert.assertEquals(Collections.singletonList(this.connectorId1), deserializeMetadata.assignment().connectors());
        Assert.assertEquals(Arrays.asList(this.taskId1x0, this.taskId2x0), deserializeMetadata.assignment().tasks());
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(1))).snapshot();
    }

    @Test
    public void testMetadataWithExistingAssignmentButOlderProtocolSelection() {
        Mockito.when(this.configStorage.snapshot()).thenReturn(this.configState1);
        this.coordinator.onJoinComplete(this.generationId, this.memberId, ConnectProtocolCompatibility.EAGER.protocol(), IncrementalCooperativeConnectProtocol.serializeAssignment(new ExtendedAssignment((short) 1, (short) 0, this.leaderId, this.leaderUrl, this.configState1.offset(), Collections.singletonList(this.connectorId1), Arrays.asList(this.taskId1x0, this.taskId2x0), Collections.emptyList(), Collections.emptyList(), 0), false));
        JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata = this.coordinator.metadata();
        Assert.assertEquals(this.expectedMetadataSize, metadata.size());
        Iterator it = metadata.iterator();
        Assert.assertTrue(it.hasNext());
        JoinGroupRequestData.JoinGroupRequestProtocol joinGroupRequestProtocol = (JoinGroupRequestData.JoinGroupRequestProtocol) it.next();
        Assert.assertEquals(this.compatibility.protocol(), joinGroupRequestProtocol.name());
        ExtendedWorkerState deserializeMetadata = IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(joinGroupRequestProtocol.metadata()));
        Assert.assertEquals(this.offset, deserializeMetadata.offset());
        Assert.assertNotEquals(ExtendedAssignment.empty(), deserializeMetadata.assignment());
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(1))).snapshot();
    }

    @Test
    public void testTaskAssignmentWhenWorkerJoins() {
        Mockito.when(this.configStorage.snapshot()).thenReturn(this.configState1);
        this.coordinator.metadata();
        this.configStorageCalls++;
        ArrayList arrayList = new ArrayList();
        addJoinGroupResponseMember(arrayList, this.leaderId, this.offset, null);
        addJoinGroupResponseMember(arrayList, this.memberId, this.offset, null);
        Map onLeaderElected = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList, false);
        ExtendedAssignment deserializeAssignment = deserializeAssignment(onLeaderElected, this.leaderId);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.singletonList(this.connectorId1), 4, Collections.emptyList(), 0, deserializeAssignment);
        ExtendedAssignment deserializeAssignment2 = deserializeAssignment(onLeaderElected, this.memberId);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.singletonList(this.connectorId2), 4, Collections.emptyList(), 0, deserializeAssignment2);
        this.coordinator.metadata();
        this.configStorageCalls++;
        ArrayList arrayList2 = new ArrayList();
        addJoinGroupResponseMember(arrayList2, this.leaderId, this.offset, deserializeAssignment);
        addJoinGroupResponseMember(arrayList2, this.memberId, this.offset, deserializeAssignment2);
        addJoinGroupResponseMember(arrayList2, this.anotherMemberId, this.offset, null);
        Map onLeaderElected2 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 1, deserializeAssignment(onLeaderElected2, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 1, deserializeAssignment(onLeaderElected2, this.memberId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected2, this.anotherMemberId));
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(this.configStorageCalls))).snapshot();
    }

    @Test
    public void testTaskAssignmentWhenWorkerLeavesPermanently() {
        Mockito.when(this.configStorage.snapshot()).thenReturn(this.configState1);
        this.coordinator.metadata();
        this.configStorageCalls++;
        ArrayList arrayList = new ArrayList();
        addJoinGroupResponseMember(arrayList, this.leaderId, this.offset, null);
        addJoinGroupResponseMember(arrayList, this.memberId, this.offset, null);
        addJoinGroupResponseMember(arrayList, this.anotherMemberId, this.offset, null);
        Map onLeaderElected = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList, false);
        ExtendedAssignment deserializeAssignment = deserializeAssignment(onLeaderElected, this.leaderId);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.singletonList(this.connectorId1), 3, Collections.emptyList(), 0, deserializeAssignment);
        ExtendedAssignment deserializeAssignment2 = deserializeAssignment(onLeaderElected, this.memberId);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.singletonList(this.connectorId2), 3, Collections.emptyList(), 0, deserializeAssignment2);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 2, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected, this.anotherMemberId));
        this.coordinator.metadata();
        this.configStorageCalls++;
        ArrayList arrayList2 = new ArrayList();
        addJoinGroupResponseMember(arrayList2, this.leaderId, this.offset, deserializeAssignment);
        addJoinGroupResponseMember(arrayList2, this.memberId, this.offset, deserializeAssignment2);
        Map onLeaderElected2 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected2, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected2, this.memberId));
        this.rebalanceDelay /= 2;
        this.time.sleep(this.rebalanceDelay);
        Map onLeaderElected3 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected3, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected3, this.memberId));
        this.time.sleep(this.rebalanceDelay + 1);
        Map onLeaderElected4 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 1, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected4, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 1, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected4, this.memberId));
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(this.configStorageCalls))).snapshot();
    }

    @Test
    public void testTaskAssignmentWhenWorkerBounces() {
        Mockito.when(this.configStorage.snapshot()).thenReturn(this.configState1);
        this.coordinator.metadata();
        this.configStorageCalls++;
        ArrayList arrayList = new ArrayList();
        addJoinGroupResponseMember(arrayList, this.leaderId, this.offset, null);
        addJoinGroupResponseMember(arrayList, this.memberId, this.offset, null);
        addJoinGroupResponseMember(arrayList, this.anotherMemberId, this.offset, null);
        Map onLeaderElected = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList, false);
        ExtendedAssignment deserializeAssignment = deserializeAssignment(onLeaderElected, this.leaderId);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.singletonList(this.connectorId1), 3, Collections.emptyList(), 0, deserializeAssignment);
        ExtendedAssignment deserializeAssignment2 = deserializeAssignment(onLeaderElected, this.memberId);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.singletonList(this.connectorId2), 3, Collections.emptyList(), 0, deserializeAssignment2);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 2, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected, this.anotherMemberId));
        this.coordinator.metadata();
        this.configStorageCalls++;
        ArrayList arrayList2 = new ArrayList();
        addJoinGroupResponseMember(arrayList2, this.leaderId, this.offset, deserializeAssignment);
        addJoinGroupResponseMember(arrayList2, this.memberId, this.offset, deserializeAssignment2);
        Map onLeaderElected2 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected2, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected2, this.memberId));
        this.rebalanceDelay /= 2;
        this.time.sleep(this.rebalanceDelay);
        addJoinGroupResponseMember(arrayList2, this.anotherMemberId, this.offset, null);
        Map onLeaderElected3 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected3, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected3, this.memberId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, this.rebalanceDelay, deserializeAssignment(onLeaderElected3, this.anotherMemberId));
        this.time.sleep(this.rebalanceDelay + 1);
        Map onLeaderElected4 = this.coordinator.onLeaderElected(this.leaderId, this.compatibility.protocol(), arrayList2, false);
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected4, this.leaderId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 0, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected4, this.memberId));
        WorkerTestUtils.assertAssignment(this.leaderId, this.offset, Collections.emptyList(), 2, Collections.emptyList(), 0, deserializeAssignment(onLeaderElected4, this.anotherMemberId));
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(this.configStorageCalls))).snapshot();
    }

    private static ExtendedAssignment deserializeAssignment(Map<String, ByteBuffer> map, String str) {
        return IncrementalCooperativeConnectProtocol.deserializeAssignment(map.get(str));
    }

    private void addJoinGroupResponseMember(List<JoinGroupResponseData.JoinGroupResponseMember> list, String str, long j, ExtendedAssignment extendedAssignment) {
        list.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId(str).setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(new ExtendedWorkerState(expectedUrl(str), j, extendedAssignment), this.compatibility != ConnectProtocolCompatibility.COMPATIBLE).array()));
    }
}
