package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.class */
public class StreamsPartitionAssignorTest {
    private final String c1 = "consumer1";
    private final String c2 = "consumer2";
    private final String c3 = "consumer3";
    private final String c4 = "consumer4";
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private final TopicPartition t4p0 = new TopicPartition("topic4", 0);
    private final TopicPartition t4p1 = new TopicPartition("topic4", 1);
    private final TopicPartition t4p2 = new TopicPartition("topic4", 2);
    private final TopicPartition t4p3 = new TopicPartition("topic4", 3);
    private final TaskId task0_0 = new TaskId(0, 0);
    private final TaskId task0_1 = new TaskId(0, 1);
    private final TaskId task0_2 = new TaskId(0, 2);
    private final TaskId task0_3 = new TaskId(0, 3);
    private final TaskId task1_0 = new TaskId(1, 0);
    private final TaskId task1_1 = new TaskId(1, 1);
    private final TaskId task1_2 = new TaskId(1, 2);
    private final TaskId task1_3 = new TaskId(1, 3);
    private final TaskId task2_0 = new TaskId(2, 0);
    private final TaskId task2_1 = new TaskId(2, 1);
    private final TaskId task2_2 = new TaskId(2, 2);
    private final TaskId task2_3 = new TaskId(2, 3);
    private final Map<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<TaskId, Set<TopicPartition>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.1
        {
            put(StreamsPartitionAssignorTest.this.task0_0, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t1p0, StreamsPartitionAssignorTest.this.t2p0}));
            put(StreamsPartitionAssignorTest.this.task0_1, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t1p1, StreamsPartitionAssignorTest.this.t2p1}));
            put(StreamsPartitionAssignorTest.this.task0_2, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t1p2, StreamsPartitionAssignorTest.this.t2p2}));
            put(StreamsPartitionAssignorTest.this.task0_3, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t1p3, StreamsPartitionAssignorTest.this.t2p3}));
            put(StreamsPartitionAssignorTest.this.task1_0, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t3p0}));
            put(StreamsPartitionAssignorTest.this.task1_1, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t3p1}));
            put(StreamsPartitionAssignorTest.this.task1_2, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t3p2}));
            put(StreamsPartitionAssignorTest.this.task1_3, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t3p3}));
            put(StreamsPartitionAssignorTest.this.task2_0, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t4p0}));
            put(StreamsPartitionAssignorTest.this.task2_1, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t4p1}));
            put(StreamsPartitionAssignorTest.this.task2_2, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t4p2}));
            put(StreamsPartitionAssignorTest.this.task2_3, Utils.mkSet(new TopicPartition[]{StreamsPartitionAssignorTest.this.t4p3}));
        }
    };
    private final Set<String> allTopics = Utils.mkSet(new String[]{"topic1", "topic2"});
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final Set<TaskId> emptyTasks = Collections.emptySet();
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
    private final String userEndPoint = "localhost:8080";
    private final String applicationId = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions;

    private Map<String, Object> configProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("application.id", "stream-partition-assignor-test");
        hashMap.put("bootstrap.servers", "localhost:8080");
        hashMap.put("__task.manager.instance__", this.taskManager);
        hashMap.put("__assignment.error.code__", new AtomicInteger());
        return hashMap;
    }

    private void configurePartitionAssignor(Map<String, Object> map) {
        Map<String, Object> configProps = configProps();
        configProps.putAll(map);
        this.partitionAssignor.configure(configProps);
    }

    private void configureDefault() {
        createMockTaskManager();
        this.partitionAssignor.configure(configProps());
    }

    private void createMockTaskManager() {
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(new StreamsBuilder().build());
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), internalTopologyBuilder);
    }

    private void createMockTaskManager(Set<TaskId> set, Set<TaskId> set2, UUID uuid, InternalTopologyBuilder internalTopologyBuilder) {
        this.taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect(this.taskManager.adminClient()).andReturn((Object) null).anyTimes();
        EasyMock.expect(this.taskManager.builder()).andReturn(internalTopologyBuilder).anyTimes();
        EasyMock.expect(this.taskManager.previousRunningTaskIds()).andReturn(set).anyTimes();
        EasyMock.expect(this.taskManager.activeTaskIds()).andReturn(set).anyTimes();
        EasyMock.expect(this.taskManager.cachedTasksIds()).andReturn(set2).anyTimes();
        EasyMock.expect(this.taskManager.processId()).andReturn(uuid).anyTimes();
    }

    @Before
    public void setUp() {
        if (this.subscriptions != null) {
            this.subscriptions.clear();
        } else {
            this.subscriptions = new HashMap();
        }
    }

    @Test
    public void shouldUseEagerRebalancingProtocol() {
        createMockTaskManager();
        Map<String, Object> configProps = configProps();
        configProps.put("upgrade.from", "2.3");
        this.partitionAssignor.configure(configProps);
        Assert.assertEquals(1L, this.partitionAssignor.supportedProtocols().size());
        Assert.assertTrue(this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Assert.assertFalse(this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @Test
    public void shouldUseCooperativeRebalancingProtocol() {
        createMockTaskManager();
        this.partitionAssignor.configure(configProps());
        Assert.assertEquals(2L, this.partitionAssignor.supportedProtocols().size());
        Assert.assertTrue(this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @Test
    public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() {
        configureDefault();
        ClientState clientState = new ClientState();
        List asList = Arrays.asList(this.task0_0, this.task0_1, this.task0_2, this.task0_3, this.task1_0, this.task1_1, this.task1_2, this.task1_3);
        HashMap<String, List<TaskId>> hashMap = new HashMap<String, List<TaskId>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.2
            {
                put("consumer1", Arrays.asList(StreamsPartitionAssignorTest.this.task0_0, StreamsPartitionAssignorTest.this.task1_1, StreamsPartitionAssignorTest.this.task1_3));
                put("consumer2", Arrays.asList(StreamsPartitionAssignorTest.this.task0_3, StreamsPartitionAssignorTest.this.task1_0));
                put("consumer3", Arrays.asList(StreamsPartitionAssignorTest.this.task0_1, StreamsPartitionAssignorTest.this.task0_2, StreamsPartitionAssignorTest.this.task1_2));
            }
        };
        for (Map.Entry<String, List<TaskId>> entry : hashMap.entrySet()) {
            Iterator<TaskId> it = entry.getValue().iterator();
            while (it.hasNext()) {
                clientState.addOwnedPartitions(this.partitionsForTask.get(it.next()), entry.getKey());
            }
        }
        Set mkSet = Utils.mkSet(new String[]{"consumer1", "consumer2", "consumer3"});
        clientState.assignActiveTasks(asList);
        assertEquivalentAssignment(hashMap, this.partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(clientState, mkSet, this.partitionsForTask, Collections.emptySet()));
    }

    @Test
    public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() {
        configureDefault();
        ClientState clientState = new ClientState();
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2, this.task0_3, this.task1_0, this.task1_1, this.task1_2, this.task1_3});
        HashMap<String, List<TaskId>> hashMap = new HashMap<String, List<TaskId>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.3
            {
                put("consumer1", new ArrayList(Arrays.asList(StreamsPartitionAssignorTest.this.task0_0, StreamsPartitionAssignorTest.this.task1_1, StreamsPartitionAssignorTest.this.task1_3)));
                put("consumer2", new ArrayList(Arrays.asList(StreamsPartitionAssignorTest.this.task0_3, StreamsPartitionAssignorTest.this.task1_0)));
                put("consumer3", new ArrayList(Arrays.asList(StreamsPartitionAssignorTest.this.task0_1, StreamsPartitionAssignorTest.this.task0_2, StreamsPartitionAssignorTest.this.task1_2)));
            }
        };
        for (Map.Entry<String, List<TaskId>> entry : hashMap.entrySet()) {
            Iterator<TaskId> it = entry.getValue().iterator();
            while (it.hasNext()) {
                clientState.addOwnedPartitions(this.partitionsForTask.get(it.next()), entry.getKey());
            }
        }
        Set mkSet2 = Utils.mkSet(new String[]{"consumer1", "consumer2", "consumer3"});
        TaskId taskId = this.task2_0;
        mkSet.add(taskId);
        clientState.assignActiveTasks(mkSet);
        Map<String, List<TaskId>> tryStickyAndBalancedTaskAssignmentWithinClient = this.partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(clientState, mkSet2, this.partitionsForTask, Collections.emptySet());
        hashMap.get("consumer2").add(taskId);
        assertEquivalentAssignment(hashMap, tryStickyAndBalancedTaskAssignmentWithinClient);
    }

    @Test
    public void shouldReturnEmptyMapWhenStickyAndBalancedAssignmentIsNotPossibleBecauseNewConsumerJoined() {
        configureDefault();
        ClientState clientState = new ClientState();
        List asList = Arrays.asList(this.task0_0, this.task0_1, this.task0_2, this.task0_3, this.task1_0, this.task1_1, this.task1_2, this.task1_3);
        for (Map.Entry<String, List<TaskId>> entry : new HashMap<String, List<TaskId>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.4
            {
                put("consumer1", Arrays.asList(StreamsPartitionAssignorTest.this.task0_0, StreamsPartitionAssignorTest.this.task1_1, StreamsPartitionAssignorTest.this.task1_3));
                put("consumer2", Arrays.asList(StreamsPartitionAssignorTest.this.task0_3, StreamsPartitionAssignorTest.this.task1_0));
                put("consumer3", Arrays.asList(StreamsPartitionAssignorTest.this.task0_1, StreamsPartitionAssignorTest.this.task0_2, StreamsPartitionAssignorTest.this.task1_2));
            }
        }.entrySet()) {
            Iterator<TaskId> it = entry.getValue().iterator();
            while (it.hasNext()) {
                clientState.addOwnedPartitions(this.partitionsForTask.get(it.next()), entry.getKey());
            }
        }
        Set mkSet = Utils.mkSet(new String[]{"consumer1", "consumer2", "consumer3", "consumer4"});
        clientState.assignActiveTasks(asList);
        MatcherAssert.assertThat(this.partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(clientState, mkSet, this.partitionsForTask, Collections.emptySet()), CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldReturnEmptyMapWhenStickyAndBalancedAssignmentIsNotPossibleBecauseOtherClientOwnedPartition() {
        configureDefault();
        ClientState clientState = new ClientState();
        List asList = Arrays.asList(this.task0_0, this.task0_1, this.task0_2, this.task0_3, this.task1_0, this.task1_1, this.task1_2, this.task1_3);
        for (Map.Entry<String, List<TaskId>> entry : new HashMap<String, List<TaskId>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.5
            {
                put("consumer1", new ArrayList(Arrays.asList(StreamsPartitionAssignorTest.this.task1_1, StreamsPartitionAssignorTest.this.task1_3)));
                put("consumer2", new ArrayList(Arrays.asList(StreamsPartitionAssignorTest.this.task0_3, StreamsPartitionAssignorTest.this.task1_0)));
                put("consumer3", new ArrayList(Arrays.asList(StreamsPartitionAssignorTest.this.task0_1, StreamsPartitionAssignorTest.this.task0_2, StreamsPartitionAssignorTest.this.task1_2)));
            }
        }.entrySet()) {
            Iterator<TaskId> it = entry.getValue().iterator();
            while (it.hasNext()) {
                clientState.addOwnedPartitions(this.partitionsForTask.get(it.next()), entry.getKey());
            }
        }
        HashSet hashSet = new HashSet(this.partitionsForTask.get(this.task0_0));
        Set mkSet = Utils.mkSet(new String[]{"consumer1", "consumer2", "consumer3"});
        clientState.assignActiveTasks(asList);
        MatcherAssert.assertThat(this.partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(clientState, mkSet, this.partitionsForTask, hashSet), CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldInterleaveTasksByGroupId() {
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 1);
        TaskId taskId3 = new TaskId(0, 2);
        TaskId taskId4 = new TaskId(0, 3);
        TaskId taskId5 = new TaskId(1, 0);
        TaskId taskId6 = new TaskId(1, 1);
        TaskId taskId7 = new TaskId(1, 2);
        TaskId taskId8 = new TaskId(2, 0);
        TaskId taskId9 = new TaskId(2, 1);
        Set mkSet = Utils.mkSet(new String[]{"c1", "c2", "c3"});
        List asList = Arrays.asList(taskId, taskId4, taskId7);
        List asList2 = Arrays.asList(taskId2, taskId5, taskId8);
        List asList3 = Arrays.asList(taskId3, taskId6, taskId9);
        HashMap hashMap = new HashMap();
        hashMap.put("c1", asList);
        hashMap.put("c2", asList2);
        hashMap.put("c3", asList3);
        List asList4 = Arrays.asList(taskId8, taskId9, taskId5, taskId6, taskId7, taskId, taskId2, taskId3, taskId4);
        Collections.shuffle(asList4);
        MatcherAssert.assertThat(StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(asList4, mkSet), CoreMatchers.equalTo(hashMap));
    }

    @Test
    public void testEagerSubscription() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(mkSet, mkSet2, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setRebalanceProtocol(ConsumerPartitionAssignor.RebalanceProtocol.EAGER);
        Set mkSet3 = Utils.mkSet(new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet3), this.partitionAssignor.subscriptionUserData(mkSet3));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Arrays.asList("topic1", "topic2"), subscription.topics());
        HashSet hashSet = new HashSet(mkSet2);
        hashSet.removeAll(mkSet);
        Assert.assertEquals(new SubscriptionInfo(randomUUID, mkSet, hashSet, (String) null).encode(), subscription.userData());
    }

    @Test
    public void testCooperativeSubscription() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(mkSet, mkSet2, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        Set mkSet3 = Utils.mkSet(new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet3), this.partitionAssignor.subscriptionUserData(mkSet3));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Arrays.asList("topic1", "topic2"), subscription.topics());
        HashSet hashSet = new HashSet(mkSet2);
        hashSet.removeAll(mkSet);
        Assert.assertEquals(new SubscriptionInfo(randomUUID, Collections.emptySet(), hashSet, (String) null).encode(), subscription.userData());
    }

    @Test
    public void testAssignBasic() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List asList = Arrays.asList("topic1", "topic2");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{this.task0_0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task0_1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{this.task0_2});
        Set<TaskId> mkSet5 = Utils.mkSet(new TaskId[]{this.task0_1});
        Set mkSet6 = Utils.mkSet(new TaskId[]{this.task0_2});
        Set mkSet7 = Utils.mkSet(new TaskId[]{this.task0_0});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        createMockTaskManager(mkSet2, mkSet5, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet2, mkSet5, "localhost:8080").encode(), Collections.singletonList(this.t1p0)));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet3, mkSet6, "localhost:8080").encode(), Collections.singletonList(this.t1p1)));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID2, mkSet4, mkSet7, "localhost:8080").encode(), Collections.singletonList(this.t1p2)));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet(new TopicPartition[]{this.t1p1, this.t2p1})}), Utils.mkSet(new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions())}));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).partitions()));
        HashSet hashSet = new HashSet(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).activeTasks());
        hashSet.addAll(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1}), hashSet);
        hashSet.addAll(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).activeTasks());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
    }

    @Test
    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addProcessor("processorII", new MockProcessorSupplier(), new String[]{"source2"});
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
        List asList = Arrays.asList("topic1", "topic2");
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 1);
        TaskId taskId3 = new TaskId(0, 2);
        TaskId taskId4 = new TaskId(0, 3);
        TaskId taskId5 = new TaskId(1, 0);
        TaskId taskId6 = new TaskId(1, 1);
        TaskId taskId7 = new TaskId(1, 2);
        TaskId taskId8 = new TaskId(1, 3);
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(new HashSet(), new HashSet(), randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, new HashSet(), new HashSet(), "localhost:8080").encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, new HashSet(), new HashSet(), "localhost:8080").encode()));
        Map groupAssignment = this.partitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t2p2, this.t1p0, this.t1p2, this.t2p0}), Utils.mkSet(new TopicPartition[]{this.t1p1, this.t2p1, this.t1p3, this.t2p3})}), Utils.mkSet(new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions())}));
        Assert.assertEquals(Arrays.asList(taskId, taskId3, taskId5, taskId7), AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).userData()).activeTasks());
        Assert.assertEquals(Arrays.asList(taskId2, taskId4, taskId6, taskId8), AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).userData()).activeTasks());
    }

    @Test
    public void testAssignWithPartialTopology() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), new String[]{"processor2"});
        List asList = Arrays.asList("topic1", "topic2");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(this.emptyTasks, this.emptyTasks, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.singletonMap("partition.grouper", SingleGroupPartitionGrouperStub.class));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        HashSet hashSet = new HashSet(checkAssignment(Utils.mkSet(new String[]{"topic1"}), (ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get("consumer10")).activeTasks());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
    }

    @Test
    public void testAssignEmptyMetadata() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List asList = Arrays.asList("topic1", "topic2");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{this.task0_0});
        Set<TaskId> mkSet3 = Utils.mkSet(new TaskId[]{this.task0_1});
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(mkSet2, mkSet3, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet2, mkSet3, "localhost:8080").encode()));
        Map groupAssignment = this.partitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Collections.emptySet(), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions()));
        HashSet hashSet = new HashSet(checkAssignment(Collections.emptySet(), (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).activeTasks());
        Assert.assertEquals(0L, hashSet.size());
        Map groupAssignment2 = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), Utils.mkSet(new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment2.get("consumer10")).partitions())}));
        hashSet.addAll(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment2.get("consumer10")).activeTasks());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
    }

    @Test
    public void testAssignWithNewTasks() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List asList = Arrays.asList("topic1", "topic2", "topic3");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2, this.task0_3});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{this.task0_0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task0_1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{this.task0_2});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        createMockTaskManager(mkSet2, this.emptyTasks, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet2, this.emptyTasks, "localhost:8080").encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet3, this.emptyTasks, "localhost:8080").encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID2, mkSet4, this.emptyTasks, "localhost:8080").encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        HashSet hashSet = new HashSet(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).userData()).activeTasks());
        HashSet hashSet2 = new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions());
        hashSet.addAll(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).userData()).activeTasks());
        hashSet2.addAll(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions());
        hashSet.addAll(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).userData()).activeTasks());
        hashSet2.addAll(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).partitions());
        Assert.assertEquals(mkSet, hashSet);
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), hashSet2);
    }

    @Test
    public void testAssignWithStates() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store3", false), new String[]{"processor-2"});
        List asList = Arrays.asList("topic1", "topic2");
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 1);
        TaskId taskId3 = new TaskId(0, 2);
        TaskId taskId4 = new TaskId(1, 0);
        TaskId taskId5 = new TaskId(1, 1);
        TaskId taskId6 = new TaskId(1, 2);
        List<TaskId> asList2 = Arrays.asList(taskId, taskId2, taskId3, taskId4, taskId5, taskId6);
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        createMockTaskManager(this.emptyTasks, this.emptyTasks, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID2, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(2L, ((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions().size());
        Assert.assertEquals(2L, ((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions().size());
        Assert.assertEquals(2L, ((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).partitions().size());
        AssignmentInfo decode = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).userData());
        AssignmentInfo decode2 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).userData());
        AssignmentInfo decode3 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).userData());
        Assert.assertEquals(2L, decode.activeTasks().size());
        Assert.assertEquals(2L, decode2.activeTasks().size());
        Assert.assertEquals(2L, decode3.activeTasks().size());
        HashSet hashSet = new HashSet();
        hashSet.addAll(decode.activeTasks());
        hashSet.addAll(decode2.activeTasks());
        hashSet.addAll(decode3.activeTasks());
        Assert.assertEquals(new HashSet(asList2), hashSet);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> map = this.builder.topicGroups();
        Assert.assertEquals(Utils.mkSet(new TaskId[]{taskId, taskId2, taskId3}), tasksForState("store1", asList2, map));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{taskId4, taskId5, taskId6}), tasksForState("store2", asList2, map));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{taskId4, taskId5, taskId6}), tasksForState("store3", asList2, map));
    }

    private Set<TaskId> tasksForState(String str, List<TaskId> list, Map<Integer, InternalTopologyBuilder.TopicsInfo> map) {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("stream-partition-assignor-test", str);
        HashSet hashSet = new HashSet();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : map.entrySet()) {
            if (entry.getValue().stateChangelogTopics.keySet().contains(storeChangelogTopic)) {
                for (TaskId taskId : list) {
                    if (taskId.topicGroupId == entry.getKey().intValue()) {
                        hashSet.add(taskId);
                    }
                }
            }
        }
        return hashSet;
    }

    @Test
    public void testAssignWithStandbyReplicas() {
        Map<String, Object> configProps = configProps();
        configProps.put("num.standby.replicas", "1");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List asList = Arrays.asList("topic1", "topic2");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{this.task0_0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task0_1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{this.task0_2});
        Set<TaskId> mkSet5 = Utils.mkSet(new TaskId[]{this.task0_1});
        Set mkSet6 = Utils.mkSet(new TaskId[]{this.task0_2});
        Set mkSet7 = Utils.mkSet(new TaskId[]{this.task0_0});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        createMockTaskManager(mkSet2, mkSet5, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.singletonMap("num.standby.replicas", 1));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet2, mkSet5, "localhost:8080").encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, mkSet3, mkSet6, "localhost:8080").encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID2, mkSet4, mkSet7, "any:9097").encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo checkAssignment = checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10"));
        HashSet hashSet = new HashSet(checkAssignment.activeTasks());
        HashSet hashSet2 = new HashSet(checkAssignment.standbyTasks().keySet());
        AssignmentInfo checkAssignment2 = checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11"));
        hashSet.addAll(checkAssignment2.activeTasks());
        hashSet2.addAll(checkAssignment2.standbyTasks().keySet());
        Assert.assertNotEquals("same processId has same set of standby tasks", checkAssignment2.standbyTasks().keySet(), checkAssignment.standbyTasks().keySet());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1}), new HashSet(hashSet));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task0_2}), new HashSet(hashSet2));
        AssignmentInfo checkAssignment3 = checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20"));
        hashSet.addAll(checkAssignment3.activeTasks());
        hashSet2.addAll(checkAssignment3.standbyTasks().keySet());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
        Assert.assertEquals(3L, hashSet2.size());
        Assert.assertEquals(mkSet, hashSet2);
    }

    @Test
    public void testOnAssignment() {
        createMockTaskManager();
        Map singletonMap = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{this.t3p0, this.t3p3}));
        this.taskManager.setPartitionsByHostState(singletonMap);
        EasyMock.expectLastCall();
        HashMap hashMap = new HashMap();
        hashMap.put(this.task0_0, Utils.mkSet(new TopicPartition[]{this.t3p0}));
        hashMap.put(this.task0_3, Utils.mkSet(new TopicPartition[]{this.t3p3}));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.task0_1, Utils.mkSet(new TopicPartition[]{this.t3p1}));
        hashMap2.put(this.task0_2, Utils.mkSet(new TopicPartition[]{this.t3p2}));
        this.taskManager.setAssignmentMetadata(hashMap, hashMap2);
        EasyMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.taskManager.setClusterMetadata((Cluster) EasyMock.capture(newCapture));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.onAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(this.t3p0, this.t3p3), new AssignmentInfo(Arrays.asList(this.task0_0, this.task0_3), hashMap2, singletonMap).encode()), (ConsumerGroupMetadata) null);
        EasyMock.verify(new Object[]{this.taskManager});
        Assert.assertEquals(Collections.singleton(this.t3p0.topic()), ((Cluster) newCapture.getValue()).topics());
        Assert.assertEquals(2L, ((Cluster) newCapture.getValue()).partitionsForTopic(this.t3p0.topic()).size());
    }

    @Test
    public void testAssignWithInternalTopics() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addInternalTopic("topicX");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        List asList = Arrays.asList("topic1", "stream-partition-assignor-test-topicX");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(this.emptyTasks, this.emptyTasks, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(1L, r0.readyTopics.size());
        Assert.assertEquals(mkSet.size(), r0.readyTopics.get("stream-partition-assignor-test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
        this.builder.setApplicationId("test");
        this.builder.addInternalTopic("topicX");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topicZ"});
        List asList = Arrays.asList("topic1", "test-topicX", "test-topicZ");
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(this.emptyTasks, this.emptyTasks, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(2L, r0.readyTopics.size());
        Assert.assertEquals(mkSet.size(), r0.readyTopics.get("test-topicZ").intValue());
    }

    @Test
    public void shouldGenerateTasksForAllCreatedPartitions() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").map(KeyValue::new).join(streamsBuilder.table("topic3").groupBy(KeyValue::new).count(), (obj, obj2) -> {
            return null;
        });
        UUID randomUUID = UUID.randomUUID();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), internalTopologyBuilder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Arrays.asList("topic1", "topic3"), new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        HashMap hashMap = new HashMap();
        hashMap.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
        hashMap.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
        hashMap.put("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", 4);
        hashMap.put("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 4);
        MatcherAssert.assertThat(mockInternalTopicManager.readyTopics, CoreMatchers.equalTo(hashMap));
        MatcherAssert.assertThat(new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("client1")).partitions()), CoreMatchers.equalTo(new HashSet(Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("topic3", 0), new TopicPartition("topic3", 1), new TopicPartition("topic3", 2), new TopicPartition("topic3", 3), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 0), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 1), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 2), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 3)))));
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"input"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor"});
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.singletonMap("application.server", "localhost:8080"));
        Set mkSet = Utils.mkSet(new String[]{"input"});
        Assert.assertEquals("localhost:8080", SubscriptionInfo.decode(new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet), this.partitionAssignor.subscriptionUserData(mkSet)).userData()).userEndPoint());
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor"});
        List singletonList = Collections.singletonList("topic1");
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(this.emptyTasks, this.emptyTasks, randomUUID, this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.singletonMap("application.server", "localhost:8080"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(singletonList, new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Set) AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get("consumer1")).userData()).partitionsByHost().get(new HostInfo("localhost", 8080)));
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        try {
            configurePartitionAssignor(Collections.singletonMap("application.server", "localhost"));
            Assert.fail("expected to an exception due to invalid config");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        try {
            configurePartitionAssignor(Collections.singletonMap("application.server", "localhost:j87yhk"));
            Assert.fail("expected to an exception due to invalid config");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("unknownTopic").selectKey((obj, obj2) -> {
            return null;
        }).join(streamsBuilder.stream("topic1").selectKey((obj3, obj4) -> {
            return null;
        }).groupByKey().count(Materialized.as("count")).toStream().map((obj5, l) -> {
            return null;
        }), (obj6, obj7) -> {
            return null;
        }, JoinWindows.of(Duration.ofMillis(0L)));
        UUID randomUUID = UUID.randomUUID();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), internalTopologyBuilder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Boolean.valueOf(mockInternalTopicManager.readyTopics.isEmpty()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("client1")).partitions().isEmpty()), CoreMatchers.equalTo(true));
    }

    @Test
    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() {
        Map<HostInfo, Set<TopicPartition>> singletonMap = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{new TopicPartition("topic", 1), new TopicPartition("topic", 2)}));
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(new StreamsBuilder().build());
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), internalTopologyBuilder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.onAssignment(createAssignment(singletonMap), (ConsumerGroupMetadata) null);
        EasyMock.verify(new Object[]{this.taskManager});
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").groupByKey().count();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        UUID randomUUID = UUID.randomUUID();
        createMockTaskManager(this.emptyTasks, this.emptyTasks, randomUUID, internalTopologyBuilder);
        EasyMock.replay(new Object[]{this.taskManager});
        HashMap hashMap = new HashMap();
        hashMap.put("num.standby.replicas", 1);
        hashMap.put("application.server", "localhost:8080");
        configurePartitionAssignor(hashMap);
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(randomUUID, this.emptyTasks, this.emptyTasks, "localhost:8080").encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), this.emptyTasks, this.emptyTasks, "other:9090").encode()));
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        AssignmentInfo decode = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get("consumer1")).userData());
        Set set = (Set) decode.partitionsByHost().get(new HostInfo("localhost", 8080));
        Set set2 = (Set) decode.partitionsByHost().get(new HostInfo("other", 9090));
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set2);
        MatcherAssert.assertThat(set, CoreMatchers.not(mkSet));
        MatcherAssert.assertThat(set2, CoreMatchers.not(mkSet));
        MatcherAssert.assertThat(hashSet, CoreMatchers.equalTo(mkSet));
    }

    @Test
    public void shouldThrowKafkaExceptionIfTaskMangerNotConfigured() {
        Map<String, Object> configProps = configProps();
        configProps.remove("__task.manager.instance__");
        try {
            this.partitionAssignor.configure(configProps);
            Assert.fail("Should have thrown KafkaException");
        } catch (KafkaException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("TaskManager is not specified"));
        }
    }

    @Test
    public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance() {
        Map<String, Object> configProps = configProps();
        configProps.put("__task.manager.instance__", "i am not a task manager");
        try {
            this.partitionAssignor.configure(configProps);
            Assert.fail("Should have thrown KafkaException");
        } catch (KafkaException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.TaskManager"));
        }
    }

    @Test
    public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() {
        createMockTaskManager();
        Map<String, Object> configProps = configProps();
        configProps.remove("__assignment.error.code__");
        try {
            this.partitionAssignor.configure(configProps);
            Assert.fail("Should have thrown KafkaException");
        } catch (KafkaException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("assignmentErrorCode is not specified"));
        }
    }

    @Test
    public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() {
        createMockTaskManager();
        Map<String, Object> configProps = configProps();
        configProps.put("__assignment.error.code__", "i am not an AtomicInteger");
        try {
            this.partitionAssignor.configure(configProps);
            Assert.fail("Should have thrown KafkaException");
        } catch (KafkaException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicInteger"));
        }
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() {
        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() {
        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() {
        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3);
    }

    private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(int i, int i2) {
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(i, UUID.randomUUID(), this.emptyTasks, this.emptyTasks, (String) null).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(i2, UUID.randomUUID(), this.emptyTasks, this.emptyTasks, (String) null).encode()));
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        this.partitionAssignor.configure(configProps());
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData()).version()), CoreMatchers.equalTo(Integer.valueOf(i)));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).userData()).version()), CoreMatchers.equalTo(Integer.valueOf(i)));
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion1() {
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.singletonMap("upgrade.from", "0.10.0"));
        Set mkSet = Utils.mkSet(new String[]{"topic1"});
        MatcherAssert.assertThat(Integer.valueOf(SubscriptionInfo.decode(new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet), this.partitionAssignor.subscriptionUserData(mkSet)).userData()).version()), CoreMatchers.equalTo(1));
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0101() {
        shouldDownGradeSubscriptionToVersion2("0.10.1");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0102() {
        shouldDownGradeSubscriptionToVersion2("0.10.2");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0110() {
        shouldDownGradeSubscriptionToVersion2("0.11.0");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For10() {
        shouldDownGradeSubscriptionToVersion2("1.0");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For11() {
        shouldDownGradeSubscriptionToVersion2("1.1");
    }

    private void shouldDownGradeSubscriptionToVersion2(Object obj) {
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        configurePartitionAssignor(Collections.singletonMap("upgrade.from", obj));
        Set mkSet = Utils.mkSet(new String[]{"topic1"});
        MatcherAssert.assertThat(Integer.valueOf(SubscriptionInfo.decode(new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet), this.partitionAssignor.subscriptionUserData(mkSet)).userData()).version()), CoreMatchers.equalTo(2));
    }

    @Test
    public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), mkSet, Collections.emptySet(), (String) null).encode(), Arrays.asList(this.t1p0, this.t1p1, this.t1p2)));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), Collections.emptySet(), Collections.emptySet(), (String) null).encode(), Collections.emptyList()));
        createMockTaskManager(mkSet, mkSet, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        this.partitionAssignor.configure(configProps());
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).partitions(), CoreMatchers.equalTo(Arrays.asList(this.t1p0, this.t1p2)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData()), CoreMatchers.equalTo(new AssignmentInfo(Arrays.asList(this.task0_0, this.task0_2), Collections.emptyMap(), Collections.emptyMap())));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).partitions(), CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).userData()), CoreMatchers.equalTo(new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap())));
    }

    @Test
    public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        Set mkSet2 = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task0_2});
        HashMap<TaskId, Set<TopicPartition>> hashMap = new HashMap<TaskId, Set<TopicPartition>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.6
            {
                put(StreamsPartitionAssignorTest.this.task0_2, Collections.singleton(StreamsPartitionAssignorTest.this.t1p2));
            }
        };
        HashMap<TaskId, Set<TopicPartition>> hashMap2 = new HashMap<TaskId, Set<TopicPartition>>() { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.7
            {
                put(StreamsPartitionAssignorTest.this.task0_0, Collections.singleton(StreamsPartitionAssignorTest.this.t1p0));
                put(StreamsPartitionAssignorTest.this.task0_1, Collections.singleton(StreamsPartitionAssignorTest.this.t1p1));
            }
        };
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), mkSet2, mkSet3, (String) null).encode(), Arrays.asList(this.t1p0, this.t1p1)));
        this.subscriptions.put("future-consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription(), Collections.singletonList(this.t1p2)));
        createMockTaskManager(mkSet, mkSet, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        Map<String, Object> configProps = configProps();
        configProps.put("num.standby.replicas", 1);
        this.partitionAssignor.configure(configProps);
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).partitions(), CoreMatchers.equalTo(Arrays.asList(this.t1p0, this.t1p1)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData()), CoreMatchers.equalTo(new AssignmentInfo(new ArrayList(mkSet2), hashMap, Collections.emptyMap())));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("future-consumer")).partitions(), CoreMatchers.equalTo(Collections.singletonList(this.t1p2)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("future-consumer")).userData()), CoreMatchers.equalTo(new AssignmentInfo(Collections.singletonList(this.task0_2), hashMap2, Collections.emptyMap())));
    }

    @Test
    public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{this.task0_0, this.task0_1, this.task0_2});
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription(), Collections.emptyList()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription(), Collections.emptyList()));
        createMockTaskManager(mkSet, mkSet, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        Map<String, Object> configProps = configProps();
        configProps.put("num.standby.replicas", 1);
        this.partitionAssignor.configure(configProps);
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).partitions(), CoreMatchers.equalTo(Arrays.asList(this.t1p0, this.t1p2)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer1")).userData()), CoreMatchers.equalTo(new AssignmentInfo(Arrays.asList(this.task0_0, this.task0_2), Collections.emptyMap(), Collections.emptyMap())));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).partitions(), CoreMatchers.equalTo(Collections.singletonList(this.t1p1)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer2")).userData()), CoreMatchers.equalTo(new AssignmentInfo(Collections.singletonList(this.task0_1), Collections.emptyMap(), Collections.emptyMap())));
    }

    @Test
    public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() {
        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
    }

    @Test
    public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
    }

    private static ByteBuffer encodeFutureSubscription() {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putInt(6);
        allocate.putInt(6);
        return allocate;
    }

    private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(int i) {
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(i, UUID.randomUUID(), this.emptyTasks, this.emptyTasks, (String) null).encode()));
        this.subscriptions.put("future-consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription()));
        createMockTaskManager(this.emptyTasks, this.emptyTasks, UUID.randomUUID(), this.builder);
        EasyMock.replay(new Object[]{this.taskManager});
        this.partitionAssignor.configure(configProps());
        try {
            this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
            Assert.fail("Should have thrown IllegalStateException");
        } catch (IllegalStateException e) {
        }
    }

    private ConsumerPartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> map) {
        return new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), map).encode());
    }

    private AssignmentInfo checkAssignment(Set<String> set, ConsumerPartitionAssignor.Assignment assignment) {
        AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
        Assert.assertEquals(assignment.partitions().size(), decode.activeTasks().size());
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : assignment.partitions()) {
            arrayList.add(new TaskId(0, topicPartition.partition()));
            hashSet.add(topicPartition.topic());
        }
        Assert.assertEquals(arrayList, decode.activeTasks());
        Assert.assertEquals(set, hashSet);
        HashSet hashSet2 = new HashSet();
        for (Map.Entry entry : decode.standbyTasks().entrySet()) {
            TaskId taskId = (TaskId) entry.getKey();
            for (TopicPartition topicPartition2 : (Set) entry.getValue()) {
                Assert.assertEquals(taskId.partition, topicPartition2.partition());
                hashSet2.add(topicPartition2.topic());
            }
        }
        if (decode.standbyTasks().size() > 0) {
            Assert.assertEquals(set, hashSet2);
        }
        return decode;
    }

    private void assertEquivalentAssignment(Map<String, List<TaskId>> map, Map<String, List<TaskId>> map2) {
        Assert.assertEquals(map.size(), map2.size());
        for (Map.Entry<String, List<TaskId>> entry : map.entrySet()) {
            String key = entry.getKey();
            Assert.assertTrue(map2.containsKey(key));
            List<TaskId> value = entry.getValue();
            Collections.sort(value);
            List<TaskId> list = map2.get(key);
            Collections.sort(list);
            MatcherAssert.assertThat(value, CoreMatchers.equalTo(list));
        }
    }
}
