package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.class */
public class StreamsPartitionAssignorTest {
    private static final String CONSUMER_1 = "consumer1";
    private static final String CONSUMER_2 = "consumer2";
    private static final String CONSUMER_3 = "consumer3";
    private static final String CONSUMER_4 = "consumer4";
    private static final String USER_END_POINT = "localhost:8080";
    private static final String OTHER_END_POINT = "other:9090";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private final Class<? extends TaskAssignor> taskAssignor;
    private final Set<String> allTopics = Utils.mkSet(new String[]{"topic1", "topic2"});
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final SubscriptionInfo defaultSubscriptionInfo = AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private StreamsMetadataState streamsMetadataState = (StreamsMetadataState) EasyMock.createNiceMock(StreamsMetadataState.class);
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap();
    private final ReferenceContainer referenceContainer = new ReferenceContainer();
    private final MockTime time = new MockTime();
    private final byte uniqueField = 1;
    private Admin adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS);
    private TopologyMetadata topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(configProps()));

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest$CorruptedInternalTopologyBuilder.class */
    private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder {
        private Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> corruptedTopicGroups;

        private CorruptedInternalTopologyBuilder() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public synchronized Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups() {
            if (this.corruptedTopicGroups == null) {
                this.corruptedTopicGroups = new HashMap();
                for (Map.Entry entry : super.topicGroups().entrySet()) {
                    InternalTopologyBuilder.TopicsInfo topicsInfo = (InternalTopologyBuilder.TopicsInfo) entry.getValue();
                    this.corruptedTopicGroups.put(entry.getKey(), new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), topicsInfo.sourceTopics, topicsInfo.repartitionSourceTopics, topicsInfo.stateChangelogTopics));
                }
            }
            return this.corruptedTopicGroups;
        }
    }

    private Map<String, Object> configProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("application.id", APPLICATION_ID);
        hashMap.put("bootstrap.servers", USER_END_POINT);
        this.referenceContainer.mainConsumer = (Consumer) EasyMock.mock(Consumer.class);
        this.referenceContainer.adminClient = this.adminClient != null ? this.adminClient : (Admin) EasyMock.mock(Admin.class);
        this.referenceContainer.taskManager = this.taskManager;
        this.referenceContainer.streamsMetadataState = this.streamsMetadataState;
        this.referenceContainer.time = this.time;
        hashMap.put("__reference.container.instance__", this.referenceContainer);
        hashMap.put("internal.task.assignor.class", this.taskAssignor.getName());
        return hashMap;
    }

    private MockInternalTopicManager configureDefault() {
        createDefaultMockTaskManager();
        return configureDefaultPartitionAssignor();
    }

    private MockInternalTopicManager configureDefaultPartitionAssignor() {
        return configurePartitionAssignorWith(Collections.emptyMap());
    }

    private MockInternalTopicManager configurePartitionAssignorWith(Map<String, Object> map) {
        Map<String, Object> configProps = configProps();
        configProps.putAll(map);
        this.partitionAssignor.configure(configProps);
        EasyMock.replay(new Object[]{this.taskManager, this.adminClient});
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(configProps()));
        return overwriteInternalTopicManagerWithMock(false);
    }

    private void createDefaultMockTaskManager() {
        createMockTaskManager(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
    }

    private void createMockTaskManager(Set<TaskId> set, Set<TaskId> set2) {
        this.taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect(this.taskManager.topologyMetadata()).andStubReturn(this.topologyMetadata);
        EasyMock.expect(this.taskManager.getTaskOffsetSums()).andStubReturn(getTaskOffsetSums(set, set2));
        EasyMock.expect(this.taskManager.processId()).andStubReturn(AssignmentTestUtils.UUID_1);
        this.builder.setApplicationId(APPLICATION_ID);
        this.topologyMetadata.buildAndRewriteTopology();
    }

    private MockInternalTopicManager overwriteInternalTopicManagerWithMock(boolean z) {
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.time, new StreamsConfig(configProps()), this.mockClientSupplier.restoreConsumer, z);
        this.partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        return mockInternalTopicManager;
    }

    @Parameterized.Parameters(name = "task assignor = {0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{HighAvailabilityTaskAssignor.class}, new Object[]{StickyTaskAssignor.class}, new Object[]{FallbackPriorTaskAssignor.class});
    }

    public StreamsPartitionAssignorTest(Class<? extends TaskAssignor> cls) {
        this.taskAssignor = cls;
    }

    @Test
    public void shouldUseEagerRebalancingProtocol() {
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", "2.3"));
        Assert.assertEquals(1L, this.partitionAssignor.supportedProtocols().size());
        Assert.assertTrue(this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Assert.assertFalse(this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @Test
    public void shouldUseCooperativeRebalancingProtocol() {
        configureDefault();
        Assert.assertEquals(2L, this.partitionAssignor.supportedProtocols().size());
        Assert.assertTrue(this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @Test
    public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() {
        List asList = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry(CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry(CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState clientState = new ClientState();
        SortedSet mkSortedSet = Utils.mkSortedSet(new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        clientState.initializePrevTasks(Collections.emptyMap());
        clientState.computeTaskLags(AssignmentTestUtils.UUID_1, getTaskEndOffsetSums(asList));
        assertEquivalentAssignment(mkMap, StreamsPartitionAssignor.assignTasksToThreads(asList, Collections.emptySet(), mkSortedSet, clientState));
    }

    @Test
    public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() {
        ArrayList arrayList = new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3));
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CONSUMER_1, new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3))), Utils.mkEntry(CONSUMER_2, new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0))), Utils.mkEntry(CONSUMER_3, new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2)))});
        ClientState clientState = new ClientState();
        SortedSet mkSortedSet = Utils.mkSortedSet(new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        clientState.initializePrevTasks(Collections.emptyMap());
        clientState.computeTaskLags(AssignmentTestUtils.UUID_1, getTaskEndOffsetSums(arrayList));
        TaskId taskId = AssignmentTestUtils.TASK_2_0;
        arrayList.add(taskId);
        clientState.assignActiveTasks(arrayList);
        Map assignTasksToThreads = StreamsPartitionAssignor.assignTasksToThreads(arrayList, Collections.emptySet(), mkSortedSet, clientState);
        ((List) mkMap.get(CONSUMER_2)).add(taskId);
        assertEquivalentAssignment(mkMap, assignTasksToThreads);
    }

    @Test
    public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves() {
        List asList = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry(CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry(CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState clientState = new ClientState();
        SortedSet mkSortedSet = Utils.mkSortedSet(new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        clientState.initializePrevTasks(Collections.emptyMap());
        clientState.computeTaskLags(AssignmentTestUtils.UUID_1, getTaskEndOffsetSums(asList));
        mkSortedSet.remove(CONSUMER_3);
        Map assignTasksToThreads = StreamsPartitionAssignor.assignTasksToThreads(asList, Collections.emptySet(), mkSortedSet, clientState);
        Assert.assertTrue(((List) assignTasksToThreads.get(CONSUMER_1)).containsAll((Collection) mkMap.get(CONSUMER_1)));
        Assert.assertTrue(((List) assignTasksToThreads.get(CONSUMER_2)).containsAll((Collection) mkMap.get(CONSUMER_2)));
        MatcherAssert.assertThat(Integer.valueOf(((List) assignTasksToThreads.get(CONSUMER_1)).size()), CoreMatchers.equalTo(4));
        MatcherAssert.assertThat(Integer.valueOf(((List) assignTasksToThreads.get(CONSUMER_2)).size()), CoreMatchers.equalTo(4));
    }

    @Test
    public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins() {
        List asList = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry(CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry(CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState clientState = new ClientState();
        SortedSet mkSortedSet = Utils.mkSortedSet(new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_1, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_2, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_3, getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        mkSortedSet.add(CONSUMER_4);
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_4, getTaskOffsetSums(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS));
        clientState.initializePrevTasks(Collections.emptyMap());
        clientState.computeTaskLags(AssignmentTestUtils.UUID_1, getTaskEndOffsetSums(asList));
        Map assignTasksToThreads = StreamsPartitionAssignor.assignTasksToThreads(asList, Collections.emptySet(), mkSortedSet, clientState);
        Assert.assertTrue(((List) mkMap.get(CONSUMER_1)).containsAll((Collection) assignTasksToThreads.get(CONSUMER_1)));
        Assert.assertTrue(((List) mkMap.get(CONSUMER_3)).containsAll((Collection) assignTasksToThreads.get(CONSUMER_3)));
        Assert.assertTrue(((List) assignTasksToThreads.get(CONSUMER_2)).containsAll((Collection) mkMap.get(CONSUMER_2)));
        MatcherAssert.assertThat(Integer.valueOf(((List) assignTasksToThreads.get(CONSUMER_1)).size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Integer.valueOf(((List) assignTasksToThreads.get(CONSUMER_2)).size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Integer.valueOf(((List) assignTasksToThreads.get(CONSUMER_3)).size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Integer.valueOf(((List) assignTasksToThreads.get(CONSUMER_4)).size()), CoreMatchers.equalTo(2));
    }

    @Test
    public void shouldInterleaveTasksByGroupIdDuringNewAssignment() {
        List asList = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(CONSUMER_1, new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_2))), Utils.mkEntry(CONSUMER_2, new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_2_0))), Utils.mkEntry(CONSUMER_3, new ArrayList(Arrays.asList(AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_2_1)))});
        ClientState clientState = new ClientState();
        SortedSet mkSortedSet = Utils.mkSortedSet(new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_1, Collections.emptyMap());
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_2, Collections.emptyMap());
        clientState.addPreviousTasksAndOffsetSums(CONSUMER_3, Collections.emptyMap());
        Collections.shuffle(asList);
        MatcherAssert.assertThat(StreamsPartitionAssignor.assignTasksToThreads(asList, Collections.emptySet(), mkSortedSet, clientState), CoreMatchers.equalTo(mkMap));
    }

    @Test
    public void testEagerSubscription() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        createMockTaskManager(mkSet, mkSet2);
        configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", "2.3"));
        MatcherAssert.assertThat(this.partitionAssignor.rebalanceProtocol(), CoreMatchers.equalTo(ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Set mkSet3 = Utils.mkSet(new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet3), this.partitionAssignor.subscriptionUserData(mkSet3));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Arrays.asList("topic1", "topic2"), subscription.topics());
        Assert.assertEquals(AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet, mkSet2, (byte) 1), SubscriptionInfo.decode(subscription.userData()));
    }

    @Test
    public void testCooperativeSubscription() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        createMockTaskManager(mkSet, mkSet2);
        configureDefaultPartitionAssignor();
        Set mkSet3 = Utils.mkSet(new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet3), this.partitionAssignor.subscriptionUserData(mkSet3));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Arrays.asList("topic1", "topic2"), subscription.topics());
        Assert.assertEquals(AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet, mkSet2, (byte) 1), SubscriptionInfo.decode(subscription.userData()));
    }

    @Test
    public void testAssignBasic() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store", false), new String[]{"processor"});
        List asList = Arrays.asList("topic1", "topic2");
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        Set mkSet5 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set mkSet6 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        createMockTaskManager(mkSet2, set);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store-changelog"), Collections.singletonList(3)));
        configureDefaultPartitionAssignor();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet2, set).encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet3, mkSet5).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, mkSet4, mkSet6).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet(new TopicPartition[]{this.t1p1, this.t2p1})}), Utils.mkSet(new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions())}));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).partitions()));
        HashSet hashSet = new HashSet(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).activeTasks());
        hashSet.addAll(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), hashSet);
        hashSet.addAll(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).activeTasks());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
    }

    @Test
    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addProcessor("processorII", new MockApiProcessorSupplier(), new String[]{"source2"});
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
        List asList = Arrays.asList("topic1", "topic2");
        configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        Map groupAssignment = this.partitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t2p2, this.t1p0, this.t1p2, this.t2p0}), Utils.mkSet(new TopicPartition[]{this.t1p1, this.t2p1, this.t1p3, this.t2p3})}), Utils.mkSet(new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions())}));
        Assert.assertEquals(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_2), AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).userData()).activeTasks());
        Assert.assertEquals(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).userData()).activeTasks());
    }

    @Test
    public void testAssignEmptyMetadata() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        List asList = Arrays.asList("topic1", "topic2");
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set<TaskId> mkSet3 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        createMockTaskManager(mkSet2, mkSet3);
        configureDefaultPartitionAssignor();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet2, mkSet3).encode()));
        Map groupAssignment = this.partitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Collections.emptySet(), new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions()));
        HashSet hashSet = new HashSet(checkAssignment(Collections.emptySet(), (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).activeTasks());
        Assert.assertEquals(0L, hashSet.size());
        Map groupAssignment2 = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), Utils.mkSet(new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment2.get("consumer10")).partitions())}));
        hashSet.addAll(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment2.get("consumer10")).activeTasks());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
    }

    @Test
    public void testAssignWithNewTasks() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List asList = Arrays.asList("topic1", "topic2", "topic3");
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2});
        createMockTaskManager(mkSet2, AssignmentTestUtils.EMPTY_TASKS);
        configureDefaultPartitionAssignor();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet3, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, mkSet4, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        HashSet hashSet = new HashSet(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).userData()).activeTasks());
        HashSet hashSet2 = new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions());
        hashSet.addAll(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).userData()).activeTasks());
        hashSet2.addAll(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions());
        hashSet.addAll(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).userData()).activeTasks());
        hashSet2.addAll(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).partitions());
        Assert.assertEquals(mkSet, hashSet);
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), hashSet2);
    }

    @Test
    public void testAssignWithStates() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store3", false), new String[]{"processor-2"});
        List asList = Arrays.asList("topic1", "topic2");
        List asList2 = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog", "stream-partition-assignor-test-store3-changelog"), Arrays.asList(3, 3, 3)));
        configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(2L, ((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).partitions().size());
        Assert.assertEquals(2L, ((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).partitions().size());
        Assert.assertEquals(2L, ((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).partitions().size());
        AssignmentInfo decode = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).userData());
        AssignmentInfo decode2 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11")).userData());
        AssignmentInfo decode3 = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).userData());
        Assert.assertEquals(2L, decode.activeTasks().size());
        Assert.assertEquals(2L, decode2.activeTasks().size());
        Assert.assertEquals(2L, decode3.activeTasks().size());
        HashSet hashSet = new HashSet();
        hashSet.addAll(decode.activeTasks());
        hashSet.addAll(decode2.activeTasks());
        hashSet.addAll(decode3.activeTasks());
        Assert.assertEquals(new HashSet(asList2), hashSet);
        Map map = this.builder.topicGroups();
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2}), tasksForState("store1", asList2, map));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}), tasksForState("store2", asList2, map));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}), tasksForState("store3", asList2, map));
    }

    private static Set<TaskId> tasksForState(String str, List<TaskId> list, Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> map) {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, str, (String) null);
        HashSet hashSet = new HashSet();
        for (Map.Entry<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> entry : map.entrySet()) {
            if (entry.getValue().stateChangelogTopics.keySet().contains(storeChangelogTopic)) {
                for (TaskId taskId : list) {
                    if (taskId.subtopology() == entry.getKey().nodeGroupId) {
                        hashSet.add(taskId);
                    }
                }
            }
        }
        return hashSet;
    }

    @Test
    public void testAssignWithStandbyReplicasAndStatelessTasks() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1", "topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        List asList = Arrays.asList("topic1", "topic2");
        createMockTaskManager(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet());
        configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet()).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertTrue(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).standbyTasks().isEmpty());
        Assert.assertTrue(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).standbyTasks().isEmpty());
    }

    @Test
    public void testAssignWithStandbyReplicasAndLoggingDisabled() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1", "topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false).withLoggingDisabled(), new String[]{"processor"});
        List asList = Arrays.asList("topic1", "topic2");
        createMockTaskManager(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet());
        configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet()).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertTrue(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10")).standbyTasks().isEmpty());
        Assert.assertTrue(checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20")).standbyTasks().isEmpty());
    }

    @Test
    public void testAssignWithStandbyReplicas() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor"});
        List asList = Arrays.asList("topic1", "topic2");
        Set set = (Set) asList.stream().map(str -> {
            return Arrays.asList(new TopicPartition(str, 0), new TopicPartition(str, 1), new TopicPartition(str, 2));
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set<TaskId> mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set mkSet5 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set<TaskId> mkSet6 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set mkSet7 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2});
        createMockTaskManager(mkSet2, mkSet6);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet2, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, (Set<TaskId>) mkSet3, (Set<TaskId>) mkSet7, USER_END_POINT).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(asList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, (Set<TaskId>) mkSet4, (Set<TaskId>) mkSet5, OTHER_END_POINT).encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo checkAssignment = checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer10"));
        HashSet hashSet = new HashSet(checkAssignment.activeTasks());
        HashSet hashSet2 = new HashSet(checkAssignment.standbyTasks().keySet());
        AssignmentInfo checkAssignment2 = checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer11"));
        hashSet.addAll(checkAssignment2.activeTasks());
        hashSet2.addAll(checkAssignment2.standbyTasks().keySet());
        Assert.assertNotEquals("same processId has same set of standby tasks", checkAssignment2.standbyTasks().keySet(), checkAssignment.standbyTasks().keySet());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), new HashSet(hashSet));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2}), new HashSet(hashSet2));
        AssignmentInfo checkAssignment3 = checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment) groupAssignment.get("consumer20"));
        hashSet.addAll(checkAssignment3.activeTasks());
        hashSet2.addAll(checkAssignment3.standbyTasks().keySet());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
        Assert.assertEquals(3L, hashSet2.size());
        Assert.assertEquals(mkSet, hashSet2);
        Map partitionsByHost = checkAssignment.partitionsByHost();
        Assert.assertEquals(2L, partitionsByHost.size());
        Assert.assertEquals(set, partitionsByHost.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet()));
        Map standbyPartitionByHost = checkAssignment.standbyPartitionByHost();
        Assert.assertEquals(2L, standbyPartitionByHost.size());
        Assert.assertEquals(set, standbyPartitionByHost.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet()));
        for (HostInfo hostInfo : partitionsByHost.keySet()) {
            Assert.assertTrue(Collections.disjoint((Collection) partitionsByHost.get(hostInfo), (Collection) standbyPartitionByHost.get(hostInfo)));
        }
        Assert.assertEquals(partitionsByHost, checkAssignment2.partitionsByHost());
        Assert.assertEquals(partitionsByHost, checkAssignment3.partitionsByHost());
        Assert.assertEquals(standbyPartitionByHost, checkAssignment2.standbyPartitionByHost());
        Assert.assertEquals(standbyPartitionByHost, checkAssignment3.standbyPartitionByHost());
    }

    @Test
    public void testOnAssignment() {
        this.taskManager = (TaskManager) EasyMock.createStrictMock(TaskManager.class);
        Map singletonMap = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{this.t3p0, this.t3p3}));
        HashMap hashMap = new HashMap();
        hashMap.put(AssignmentTestUtils.TASK_0_0, Utils.mkSet(new TopicPartition[]{this.t3p0}));
        hashMap.put(AssignmentTestUtils.TASK_0_3, Utils.mkSet(new TopicPartition[]{this.t3p3}));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(AssignmentTestUtils.TASK_0_1, Utils.mkSet(new TopicPartition[]{this.t3p1}));
        hashMap2.put(AssignmentTestUtils.TASK_0_2, Utils.mkSet(new TopicPartition[]{this.t3p2}));
        this.taskManager.handleAssignment(hashMap, hashMap2);
        EasyMock.expectLastCall();
        this.streamsMetadataState = (StreamsMetadataState) EasyMock.createStrictMock(StreamsMetadataState.class);
        Capture newCapture = EasyMock.newCapture();
        this.streamsMetadataState.onChange((Map) EasyMock.eq(singletonMap), (Map) EasyMock.anyObject(), (Cluster) EasyMock.capture(newCapture));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.streamsMetadataState});
        configureDefaultPartitionAssignor();
        this.partitionAssignor.onAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(this.t3p0, this.t3p3), new AssignmentInfo(10, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3), hashMap2, singletonMap, Collections.emptyMap(), 0).encode()), (ConsumerGroupMetadata) null);
        EasyMock.verify(new Object[]{this.streamsMetadataState});
        EasyMock.verify(new Object[]{this.taskManager});
        Assert.assertEquals(Collections.singleton(this.t3p0.topic()), ((Cluster) newCapture.getValue()).topics());
        Assert.assertEquals(2L, ((Cluster) newCapture.getValue()).partitionsForTopic(this.t3p0.topic()).size());
    }

    @Test
    public void testAssignWithInternalTopics() {
        this.builder.addInternalTopic("topicX", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockApiProcessorSupplier(), new String[]{"source2"});
        List asList = Arrays.asList("topic1", "stream-partition-assignor-test-topicX");
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        MockInternalTopicManager configureDefault = configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        Assert.assertEquals(1L, configureDefault.readyTopics.size());
        Assert.assertEquals(mkSet.size(), configureDefault.readyTopics.get("stream-partition-assignor-test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
        this.builder.addInternalTopic("topicX", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ", InternalTopicProperties.empty());
        this.builder.addProcessor("processor2", new MockApiProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topicZ"});
        List asList = Arrays.asList("topic1", "stream-partition-assignor-test-topicX", "stream-partition-assignor-test-topicZ");
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        MockInternalTopicManager configureDefault = configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        Assert.assertEquals(2L, configureDefault.readyTopics.size());
        Assert.assertEquals(mkSet.size(), configureDefault.readyTopics.get("stream-partition-assignor-test-topicZ").intValue());
    }

    @Test
    public void shouldGenerateTasksForAllCreatedPartitions() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").map(KeyValue::new).join(streamsBuilder.table("topic3").groupBy(KeyValue::new).count(), (obj, obj2) -> {
            return null;
        });
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(configProps()));
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", "stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog"), Arrays.asList(4, 4)));
        MockInternalTopicManager configureDefault = configureDefault();
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Arrays.asList("topic1", "topic3"), this.defaultSubscriptionInfo.encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        HashMap hashMap = new HashMap();
        hashMap.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
        hashMap.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
        hashMap.put("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", 4);
        hashMap.put("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 4);
        MatcherAssert.assertThat(configureDefault.readyTopics, CoreMatchers.equalTo(hashMap));
        MatcherAssert.assertThat(new HashSet(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("client1")).partitions()), CoreMatchers.equalTo(new HashSet(Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("topic3", 0), new TopicPartition("topic3", 1), new TopicPartition("topic3", 2), new TopicPartition("topic3", 3), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 0), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 1), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 2), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 3)))));
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").repartition();
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        createDefaultMockTaskManager();
        EasyMock.replay(new Object[]{this.taskManager});
        this.partitionAssignor.configure(configProps());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.time, new StreamsConfig(configProps()), this.mockClientSupplier.restoreConsumer, false) { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.1
            @Override // org.apache.kafka.test.MockInternalTopicManager
            public Set<String> makeReady(Map<String, InternalTopicConfig> map) {
                throw new TimeoutException("KABOOM!");
            }
        });
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Assert.assertThrows(TimeoutException.class, () -> {
            this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        });
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut() {
        StreamsConfig streamsConfig = new StreamsConfig(configProps());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic1", Materialized.as("store"));
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.topologyMetadata = new TopologyMetadata(this.builder, streamsConfig);
        createDefaultMockTaskManager();
        EasyMock.replay(new Object[]{this.taskManager});
        this.partitionAssignor.configure(configProps());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.time, streamsConfig, this.mockClientSupplier.restoreConsumer, false) { // from class: org.apache.kafka.streams.processor.internals.StreamsPartitionAssignorTest.2
            @Override // org.apache.kafka.test.MockInternalTopicManager
            public Set<String> makeReady(Map<String, InternalTopicConfig> map) {
                if (map.isEmpty()) {
                    return Collections.emptySet();
                }
                throw new TimeoutException("KABOOM!");
            }
        });
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Assert.assertThrows(TimeoutException.class, () -> {
            this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        });
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"input"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor"});
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("application.server", USER_END_POINT));
        Set mkSet = Utils.mkSet(new String[]{"input"});
        Assert.assertEquals(USER_END_POINT, SubscriptionInfo.decode(new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet), this.partitionAssignor.subscriptionUserData(mkSet)).userData()).userEndPoint());
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor"});
        List singletonList = Collections.singletonList("topic1");
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("application.server", USER_END_POINT));
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(singletonList, AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode()));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Set) AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get(CONSUMER_1)).userData()).partitionsByHost().get(new HostInfo("localhost", 8080)));
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
        createDefaultMockTaskManager();
        try {
            configurePartitionAssignorWith(Collections.singletonMap("application.server", "localhost"));
            Assert.fail("expected to an exception due to invalid config");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
        createDefaultMockTaskManager();
        Assert.assertThrows(ConfigException.class, () -> {
            configurePartitionAssignorWith(Collections.singletonMap("application.server", "localhost:j87yhk"));
        });
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("unknownTopic").selectKey((obj, obj2) -> {
            return null;
        }).join(streamsBuilder.stream("topic1").selectKey((obj3, obj4) -> {
            return null;
        }).groupByKey().count(Materialized.as("count")).toStream().map((obj5, l) -> {
            return null;
        }), (obj6, obj7) -> {
            return null;
        }, JoinWindows.of(Duration.ofMillis(0L)));
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        MockInternalTopicManager configureDefault = configureDefault();
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), this.defaultSubscriptionInfo.encode()));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Boolean.valueOf(configureDefault.readyTopics.isEmpty()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("client1")).partitions().isEmpty()), CoreMatchers.equalTo(true));
    }

    @Test
    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry(new HostInfo("otherhost", 9090), Utils.mkSet(new TopicPartition[]{this.t2p0, this.t2p1}))});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry(new HostInfo("newotherhost", 9090), Utils.mkSet(new TopicPartition[]{this.t2p0, this.t2p1}))});
        this.streamsMetadataState = (StreamsMetadataState) EasyMock.createStrictMock(StreamsMetadataState.class);
        this.streamsMetadataState.onChange((Map) EasyMock.eq(mkMap), (Map) EasyMock.anyObject(), (Cluster) EasyMock.anyObject());
        this.streamsMetadataState.onChange((Map) EasyMock.eq(mkMap2), (Map) EasyMock.anyObject(), (Cluster) EasyMock.anyObject());
        EasyMock.replay(new Object[]{this.streamsMetadataState});
        createDefaultMockTaskManager();
        configureDefaultPartitionAssignor();
        this.partitionAssignor.onAssignment(createAssignment(mkMap), (ConsumerGroupMetadata) null);
        this.partitionAssignor.onAssignment(createAssignment(mkMap2), (ConsumerGroupMetadata) null);
        EasyMock.verify(new Object[]{this.taskManager, this.streamsMetadataState});
    }

    @Test
    public void shouldTriggerImmediateRebalanceOnHostInfoChange() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry(new HostInfo("otherhost", 9090), Utils.mkSet(new TopicPartition[]{this.t2p0, this.t2p1}))});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new HostInfo("newhost", 9090), Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry(new HostInfo("otherhost", 9090), Utils.mkSet(new TopicPartition[]{this.t2p0, this.t2p1}))});
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("application.server", "newhost:9090"));
        this.partitionAssignor.onAssignment(createAssignment(mkMap), (ConsumerGroupMetadata) null);
        MatcherAssert.assertThat(Long.valueOf(this.referenceContainer.nextScheduledRebalanceMs.get()), Matchers.is(0L));
        this.partitionAssignor.onAssignment(createAssignment(mkMap2), (ConsumerGroupMetadata) null);
        MatcherAssert.assertThat(Long.valueOf(this.referenceContainer.nextScheduledRebalanceMs.get()), Matchers.is(Long.MAX_VALUE));
    }

    @Test
    public void shouldTriggerImmediateRebalanceOnTasksRevoked() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        List asList = Arrays.asList(this.t1p0, this.t1p1, this.t1p2);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet, AssignmentTestUtils.EMPTY_TASKS).encode(), asList));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, mkSet).encode(), Collections.emptyList()));
        createMockTaskManager(mkSet, mkSet);
        configurePartitionAssignorWith(Collections.singletonMap("acceptable.recovery.lag", 0L));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_1)).partitions(), CoreMatchers.not(asList));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).partitions(), CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).userData()).activeTasks(), CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).userData()).standbyTasks(), CoreMatchers.equalTo(Collections.emptyMap()));
        this.partitionAssignor.onAssignment((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2), (ConsumerGroupMetadata) null);
        MatcherAssert.assertThat(Long.valueOf(this.referenceContainer.nextScheduledRebalanceMs.get()), Matchers.is(0L));
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
        Map<String, Object> configProps = configProps();
        configProps.put("num.standby.replicas", 1);
        configProps.put("application.server", USER_END_POINT);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").groupByKey().count();
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(configProps));
        createDefaultMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"), Collections.singletonList(3)));
        configurePartitionAssignorWith(configProps);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode()));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, OTHER_END_POINT).encode()));
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        AssignmentInfo decode = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get(CONSUMER_1)).userData());
        Set set = (Set) decode.partitionsByHost().get(new HostInfo("localhost", 8080));
        Set set2 = (Set) decode.partitionsByHost().get(new HostInfo("other", 9090));
        Set set3 = (Set) decode.standbyPartitionByHost().get(new HostInfo("localhost", 8080));
        Set set4 = (Set) decode.standbyPartitionByHost().get(new HostInfo("other", 9090));
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set2);
        MatcherAssert.assertThat(set, CoreMatchers.not(mkSet));
        MatcherAssert.assertThat(set2, CoreMatchers.not(mkSet));
        MatcherAssert.assertThat(set, CoreMatchers.equalTo(set4));
        MatcherAssert.assertThat(set2, CoreMatchers.equalTo(set3));
        MatcherAssert.assertThat(hashSet, CoreMatchers.equalTo(mkSet));
    }

    @Test
    public void shouldThrowKafkaExceptionIfReferenceContainerNotConfigured() {
        Map<String, Object> configProps = configProps();
        configProps.remove("__reference.container.instance__");
        MatcherAssert.assertThat(Assert.assertThrows(KafkaException.class, () -> {
            this.partitionAssignor.configure(configProps);
        }).getMessage(), CoreMatchers.equalTo("ReferenceContainer is not specified"));
    }

    @Test
    public void shouldThrowKafkaExceptionIfReferenceContainerConfigIsNotTaskManagerInstance() {
        Map<String, Object> configProps = configProps();
        configProps.put("__reference.container.instance__", "i am not a reference container");
        MatcherAssert.assertThat(Assert.assertThrows(KafkaException.class, () -> {
            this.partitionAssignor.configure(configProps);
        }).getMessage(), CoreMatchers.equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer"));
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() {
        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() {
        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() {
        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3);
    }

    private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(int i, int i2) {
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), getInfoForOlderVersion(i, AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), getInfoForOlderVersion(i2, AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        configureDefault();
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_1)).userData()).version()), CoreMatchers.equalTo(Integer.valueOf(i)));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).userData()).version()), CoreMatchers.equalTo(Integer.valueOf(i)));
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion1() {
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", "0.10.0"));
        Set mkSet = Utils.mkSet(new String[]{"topic1"});
        MatcherAssert.assertThat(Integer.valueOf(SubscriptionInfo.decode(new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet), this.partitionAssignor.subscriptionUserData(mkSet)).userData()).version()), CoreMatchers.equalTo(1));
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0101() {
        shouldDownGradeSubscriptionToVersion2("0.10.1");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0102() {
        shouldDownGradeSubscriptionToVersion2("0.10.2");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0110() {
        shouldDownGradeSubscriptionToVersion2("0.11.0");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For10() {
        shouldDownGradeSubscriptionToVersion2("1.0");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For11() {
        shouldDownGradeSubscriptionToVersion2("1.1");
    }

    private void shouldDownGradeSubscriptionToVersion2(Object obj) {
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", obj));
        Set mkSet = Utils.mkSet(new String[]{"topic1"});
        MatcherAssert.assertThat(Integer.valueOf(SubscriptionInfo.decode(new ConsumerPartitionAssignor.Subscription(new ArrayList(mkSet), this.partitionAssignor.subscriptionUserData(mkSet)).userData()).version()), CoreMatchers.equalTo(2));
    }

    @Test
    public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_1, mkSet, AssignmentTestUtils.EMPTY_TASKS).encode(), Arrays.asList(this.t1p0, this.t1p1, this.t1p2)));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList()));
        createMockTaskManager(mkSet, mkSet);
        configureDefaultPartitionAssignor();
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).partitions(), CoreMatchers.equalTo(Collections.emptyList()));
        AssignmentInfo decode = AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).userData());
        MatcherAssert.assertThat(Integer.valueOf(decode.version()), Matchers.is(10));
        MatcherAssert.assertThat(decode.activeTasks(), Matchers.empty());
        MatcherAssert.assertThat(decode.partitionsByHost(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(decode.standbyPartitionByHost(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(Integer.valueOf(decode.errCode()), Matchers.is(0));
    }

    @Test
    public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Set<TaskId> mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription(), Collections.emptyList()));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription(), Collections.emptyList()));
        createMockTaskManager(mkSet, mkSet);
        configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment.size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_1)).partitions(), CoreMatchers.equalTo(Arrays.asList(this.t1p0, this.t1p2)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_1)).userData()), CoreMatchers.equalTo(new AssignmentInfo(10, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)));
        MatcherAssert.assertThat(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).partitions(), CoreMatchers.equalTo(Collections.singletonList(this.t1p1)));
        MatcherAssert.assertThat(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_2)).userData()), CoreMatchers.equalTo(new AssignmentInfo(10, Collections.singletonList(AssignmentTestUtils.TASK_0_1), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)));
    }

    @Test
    public void shouldEncodeAssignmentErrorIfV1SubscriptionAndFutureSubscriptionIsMixed() {
        shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
    }

    @Test
    public void shouldEncodeAssignmentErrorIfV2SubscriptionAndFutureSubscriptionIsMixed() {
        shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
    }

    @Test
    public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "KSTREAM-SOURCE-0000000000", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"input-stream"});
        this.builder.addProcessor("KSTREAM-FLATMAPVALUES-0000000001", new MockApiProcessorSupplier(), new String[]{"KSTREAM-SOURCE-0000000000"});
        this.builder.addProcessor("KSTREAM-BRANCH-0000000002", new MockApiProcessorSupplier(), new String[]{"KSTREAM-FLATMAPVALUES-0000000001"});
        this.builder.addProcessor("KSTREAM-BRANCHCHILD-0000000003", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCH-0000000002"});
        this.builder.addProcessor("KSTREAM-BRANCHCHILD-0000000004", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCH-0000000002"});
        this.builder.addProcessor("KSTREAM-MAP-0000000005", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCHCHILD-0000000003"});
        this.builder.addInternalTopic("odd_store-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("odd_store-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000005"});
        this.builder.addSink("odd_store-repartition-sink", "odd_store-repartition", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"odd_store-repartition-filter"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "odd_store-repartition-source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"odd_store-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000006", new MockApiProcessorSupplier(), new String[]{"odd_store-repartition-source"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000010", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000006"});
        this.builder.addProcessor("KSTREAM-PEEK-0000000011", new MockApiProcessorSupplier(), new String[]{"KTABLE-TOSTREAM-0000000010"});
        this.builder.addProcessor("KSTREAM-MAP-0000000012", new MockApiProcessorSupplier(), new String[]{"KSTREAM-PEEK-0000000011"});
        this.builder.addInternalTopic("odd_store_2-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("odd_store_2-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000012"});
        this.builder.addSink("odd_store_2-repartition-sink", "odd_store_2-repartition", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"odd_store_2-repartition-filter"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "odd_store_2-repartition-source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"odd_store_2-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000013", new MockApiProcessorSupplier(), new String[]{"odd_store_2-repartition-source"});
        this.builder.addProcessor("KSTREAM-MAP-0000000017", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCHCHILD-0000000004"});
        this.builder.addInternalTopic("even_store-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("even_store-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000017"});
        this.builder.addSink("even_store-repartition-sink", "even_store-repartition", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"even_store-repartition-filter"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "even_store-repartition-source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"even_store-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000018", new MockApiProcessorSupplier(), new String[]{"even_store-repartition-source"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000022", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000018"});
        this.builder.addProcessor("KSTREAM-PEEK-0000000023", new MockApiProcessorSupplier(), new String[]{"KTABLE-TOSTREAM-0000000022"});
        this.builder.addProcessor("KSTREAM-MAP-0000000024", new MockApiProcessorSupplier(), new String[]{"KSTREAM-PEEK-0000000023"});
        this.builder.addInternalTopic("even_store_2-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("even_store_2-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000024"});
        this.builder.addSink("even_store_2-repartition-sink", "even_store_2-repartition", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"even_store_2-repartition-filter"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "even_store_2-repartition-source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"even_store_2-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000025", new MockApiProcessorSupplier(), new String[]{"even_store_2-repartition-source"});
        this.builder.addProcessor("KTABLE-JOINTHIS-0000000030", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000013"});
        this.builder.addProcessor("KTABLE-JOINOTHER-0000000031", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000025"});
        this.builder.addProcessor("KTABLE-MERGE-0000000029", new MockApiProcessorSupplier(), new String[]{"KTABLE-JOINTHIS-0000000030", "KTABLE-JOINOTHER-0000000031"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000032", new MockApiProcessorSupplier(), new String[]{"KTABLE-MERGE-0000000029"});
        List asList = Arrays.asList("input-stream", "test-even_store-repartition", "test-even_store_2-repartition", "test-odd_store-repartition", "test-odd_store_2-repartition");
        configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(asList, this.defaultSubscriptionInfo.encode()));
        this.partitionAssignor.assign(new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.singletonList(new PartitionInfo("input-stream", 0, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet()), new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
    }

    @Test
    public void shouldGetAssignmentConfigs() {
        createDefaultMockTaskManager();
        Map<String, Object> configProps = configProps();
        configProps.put("acceptable.recovery.lag", 11);
        configProps.put("max.warmup.replicas", 33);
        configProps.put("num.standby.replicas", 44);
        configProps.put("probing.rebalance.interval.ms", 3300000L);
        this.partitionAssignor.configure(configProps);
        MatcherAssert.assertThat(Long.valueOf(this.partitionAssignor.acceptableRecoveryLag()), CoreMatchers.equalTo(11L));
        MatcherAssert.assertThat(Integer.valueOf(this.partitionAssignor.maxWarmupReplicas()), CoreMatchers.equalTo(33));
        MatcherAssert.assertThat(Integer.valueOf(this.partitionAssignor.numStandbyReplicas()), CoreMatchers.equalTo(44));
        MatcherAssert.assertThat(Long.valueOf(this.partitionAssignor.probingRebalanceIntervalMs()), CoreMatchers.equalTo(3300000L));
    }

    @Test
    public void shouldGetTime() {
        this.time.setCurrentTimeMs(Long.MAX_VALUE);
        createDefaultMockTaskManager();
        MatcherAssert.assertThat(Long.valueOf(new AssignorConfiguration(configProps()).referenceContainer().time.milliseconds()), CoreMatchers.equalTo(Long.MAX_VALUE));
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogEndOffsets() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(2)));
        configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        });
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOffsets() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), new String[]{"processor1"});
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        });
    }

    @Test
    public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() {
        this.adminClient = (Admin) EasyMock.createMock(AdminClient.class);
        ListOffsetsResult listOffsetsResult = (ListOffsetsResult) EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(Collections.emptyMap());
        EasyMock.expect(this.adminClient.listOffsets(Collections.emptyMap())).andStubReturn(listOffsetsResult);
        EasyMock.expect(listOffsetsResult.all()).andReturn(kafkaFutureImpl);
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        EasyMock.replay(new Object[]{listOffsetsResult});
        configureDefault();
        overwriteInternalTopicManagerWithMock(true);
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        EasyMock.verify(new Object[]{this.adminClient});
    }

    @Test
    public void shouldRequestEndOffsetsForPreexistingChangelogs() {
        Set mkSet = Utils.mkSet(new TopicPartition[]{new TopicPartition("stream-partition-assignor-test-store-changelog", 0), new TopicPartition("stream-partition-assignor-test-store-changelog", 1), new TopicPartition("stream-partition-assignor-test-store-changelog", 2)});
        this.adminClient = (Admin) EasyMock.createMock(AdminClient.class);
        ListOffsetsResult listOffsetsResult = (ListOffsetsResult) EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(mkSet.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            ListOffsetsResult.ListOffsetsResultInfo listOffsetsResultInfo = (ListOffsetsResult.ListOffsetsResultInfo) EasyMock.createNiceMock(ListOffsetsResult.ListOffsetsResultInfo.class);
            EasyMock.expect(Long.valueOf(listOffsetsResultInfo.offset())).andStubReturn(Long.MAX_VALUE);
            EasyMock.replay(new Object[]{listOffsetsResultInfo});
            return listOffsetsResultInfo;
        })));
        Capture newCapture = EasyMock.newCapture();
        EasyMock.expect(this.adminClient.listOffsets((Map) EasyMock.capture(newCapture))).andReturn(listOffsetsResult).once();
        EasyMock.expect(listOffsetsResult.all()).andReturn(kafkaFutureImpl);
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store", false), new String[]{"processor1"});
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        EasyMock.replay(new Object[]{listOffsetsResult});
        configureDefault();
        overwriteInternalTopicManagerWithMock(false);
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        EasyMock.verify(new Object[]{this.adminClient});
        MatcherAssert.assertThat(((Map) newCapture.getValue()).keySet(), CoreMatchers.equalTo(mkSet));
    }

    @Test
    public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
        Set mkSet = Utils.mkSet(new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)});
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic1", Materialized.as("store"));
        Properties properties = new Properties();
        properties.putAll(configProps());
        properties.put("topology.optimization", "all");
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build(properties));
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(properties));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        createDefaultMockTaskManager();
        configurePartitionAssignorWith(Collections.singletonMap("topology.optimization", "all"));
        overwriteInternalTopicManagerWithMock(false);
        Consumer consumer = this.referenceContainer.mainConsumer;
        EasyMock.expect(consumer.committed((Set) EasyMock.eq(mkSet))).andReturn(mkSet.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return new OffsetAndMetadata(Long.MAX_VALUE);
        }))).once();
        EasyMock.replay(new Object[]{consumer});
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        EasyMock.verify(new Object[]{consumer});
    }

    @Test
    public void shouldEncodeMissingSourceTopicError() {
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptyList(), Collections.emptySet(), Collections.emptySet());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        configureDefault();
        this.subscriptions.put("consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"), this.defaultSubscriptionInfo.encode()));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get("consumer")).userData()).errCode()), CoreMatchers.equalTo(Integer.valueOf(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())));
    }

    @Test
    public void testUniqueField() {
        createDefaultMockTaskManager();
        configureDefaultPartitionAssignor();
        Set mkSet = Utils.mkSet(new String[]{"input"});
        Assert.assertEquals(0L, this.partitionAssignor.uniqueField());
        this.partitionAssignor.subscriptionUserData(mkSet);
        Assert.assertEquals(1L, this.partitionAssignor.uniqueField());
        this.partitionAssignor.subscriptionUserData(mkSet);
        Assert.assertEquals(2L, this.partitionAssignor.uniqueField());
    }

    @Test
    public void testUniqueFieldOverflow() {
        createDefaultMockTaskManager();
        configureDefaultPartitionAssignor();
        Set mkSet = Utils.mkSet(new String[]{"input"});
        for (int i = 0; i < 127; i++) {
            this.partitionAssignor.subscriptionUserData(mkSet);
        }
        Assert.assertEquals(127L, this.partitionAssignor.uniqueField());
        this.partitionAssignor.subscriptionUserData(mkSet);
        Assert.assertEquals(-128L, this.partitionAssignor.uniqueField());
    }

    @Test
    public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() {
        this.builder = new CorruptedInternalTopologyBuilder();
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(configProps()));
        InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(this.builder);
        KStream stream = internalStreamsBuilder.stream(Collections.singleton("topic1"), new ConsumedInternal());
        stream.groupBy((str, str2) -> {
            return str;
        }, Grouped.with("GroupName", Serdes.String(), Serdes.String())).windowedBy(TimeWindows.of(Duration.ofMinutes(10L))).aggregate(() -> {
            return "";
        }, (str3, str4, str5) -> {
            return str5 + str3;
        }).leftJoin(internalStreamsBuilder.table("topic2", new ConsumedInternal(), new MaterializedInternal(Materialized.as("store"))), str6 -> {
            return str6;
        }, (str7, str8) -> {
            return str7 + str8;
        });
        internalStreamsBuilder.buildAndOptimizeTopology();
        configureDefault();
        this.subscriptions.put("consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"), this.defaultSubscriptionInfo.encode()));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment().get("consumer")).userData()).errCode()), CoreMatchers.equalTo(Integer.valueOf(AssignorError.ASSIGNMENT_ERROR.code())));
    }

    private static ByteBuffer encodeFutureSubscription() {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putInt(11);
        allocate.putInt(11);
        return allocate;
    }

    private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(int i) {
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), getInfoForOlderVersion(i, AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put("future-consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), encodeFutureSubscription()));
        configureDefault();
        Map groupAssignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get(CONSUMER_1)).userData()).errCode()), CoreMatchers.equalTo(Integer.valueOf(AssignorError.ASSIGNMENT_ERROR.code())));
        MatcherAssert.assertThat(Integer.valueOf(AssignmentInfo.decode(((ConsumerPartitionAssignor.Assignment) groupAssignment.get("future-consumer")).userData()).errCode()), CoreMatchers.equalTo(Integer.valueOf(AssignorError.ASSIGNMENT_ERROR.code())));
    }

    private static ConsumerPartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> map) {
        return new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(10, Collections.emptyList(), Collections.emptyMap(), map, Collections.emptyMap(), 0).encode());
    }

    private static AssignmentInfo checkAssignment(Set<String> set, ConsumerPartitionAssignor.Assignment assignment) {
        AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
        Assert.assertEquals(assignment.partitions().size(), decode.activeTasks().size());
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : assignment.partitions()) {
            arrayList.add(new TaskId(0, topicPartition.partition()));
            hashSet.add(topicPartition.topic());
        }
        Assert.assertEquals(arrayList, decode.activeTasks());
        Assert.assertEquals(set, hashSet);
        HashSet hashSet2 = new HashSet();
        for (Map.Entry entry : decode.standbyTasks().entrySet()) {
            TaskId taskId = (TaskId) entry.getKey();
            for (TopicPartition topicPartition2 : (Set) entry.getValue()) {
                Assert.assertEquals(taskId.partition(), topicPartition2.partition());
                hashSet2.add(topicPartition2.topic());
            }
        }
        if (!decode.standbyTasks().isEmpty()) {
            Assert.assertEquals(set, hashSet2);
        }
        return decode;
    }

    private static void assertEquivalentAssignment(Map<String, List<TaskId>> map, Map<String, List<TaskId>> map2) {
        Assert.assertEquals(map.size(), map2.size());
        for (Map.Entry<String, List<TaskId>> entry : map.entrySet()) {
            String key = entry.getKey();
            Assert.assertTrue(map2.containsKey(key));
            List<TaskId> value = entry.getValue();
            Collections.sort(value);
            List<TaskId> list = map2.get(key);
            Collections.sort(list);
            MatcherAssert.assertThat(value, CoreMatchers.equalTo(list));
        }
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> list, List<Integer> list2) {
        if (list.size() != list2.size()) {
            throw new IllegalStateException("Passed in " + list.size() + " changelog topic names, but " + list2.size() + " different numPartitions for the topics");
        }
        HashMap hashMap = new HashMap();
        for (int i = 0; i < list.size(); i++) {
            String str = list.get(i);
            int intValue = list2.get(i).intValue();
            for (int i2 = 0; i2 < intValue; i2++) {
                hashMap.put(new TopicPartition(str, i2), Long.MAX_VALUE);
            }
        }
        return hashMap;
    }

    private static SubscriptionInfo getInfoForOlderVersion(int i, UUID uuid, Set<TaskId> set, Set<TaskId> set2) {
        return new SubscriptionInfo(i, 10, uuid, (String) null, getTaskOffsetSums(set, set2), (byte) 0, 0);
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Collection<TaskId> collection, Collection<TaskId> collection2) {
        Map<TaskId, Long> map = (Map) collection.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return -2L;
        }));
        map.putAll((Map) collection2.stream().collect(Collectors.toMap(taskId3 -> {
            return taskId3;
        }, taskId4 -> {
            return 0L;
        })));
        return map;
    }

    private static Map<TaskId, Long> getTaskEndOffsetSums(Collection<TaskId> collection) {
        return (Map) collection.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return Long.MAX_VALUE;
        }));
    }
}
