/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Assert;
import org.junit.Test;

public class StreamPartitionAssignorTest {
    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private Set<String> allTopics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final TaskId task0 = new TaskId(0, 0);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private String userEndPoint = null;

    private Properties configProps() {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-partition-assignor-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        };
    }

    @Test
    public void testSubscription() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        final Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        final Set cachedTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        String clientId = "client-id";
        UUID processId = UUID.randomUUID();
        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder)){

            public Set<TaskId> prevTasks() {
                return prevTasks;
            }

            public Set<TaskId> cachedTasks() {
                return cachedTasks;
            }
        };
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"topic1", "topic2"}));
        Collections.sort(subscription.topics());
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"topic1", "topic2"}), (Object)subscription.topics());
        HashSet standbyTasks = new HashSet(cachedTasks);
        standbyTasks.removeAll(prevTasks);
        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
        Assert.assertEquals((Object)info.encode(), (Object)subscription.userData());
    }

    @Test
    public void testAssignBasic() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(builder, config, (KafkaClientSupplier)new MockClientSupplier(), "test", client1, uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, this.userEndPoint).encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, this.userEndPoint).encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, this.userEndPoint).encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions()));
        HashSet allActiveTasks = new HashSet();
        AssignmentInfo info10 = this.checkAssignment((PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        AssignmentInfo info11 = this.checkAssignment((PartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1}), allActiveTasks);
        AssignmentInfo info20 = this.checkAssignment((PartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void testAssignWithNewTasks() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addSource("source3", new String[]{"topic3"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2", "topic3"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2, this.task3});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(builder, config, (KafkaClientSupplier)new MockClientSupplier(), "test", client1, uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), this.userEndPoint).encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), this.userEndPoint).encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), this.userEndPoint).encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        HashSet allActiveTasks = new HashSet();
        HashSet allPartitions = new HashSet();
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), allPartitions);
    }

    @Test
    public void testAssignWithStates() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        String applicationId = "test";
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId(applicationId);
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source1"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store1", false), new String[]{"processor-1"});
        builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source2"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store2", false), new String[]{"processor-2"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store3", false), new String[]{"processor-2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        TaskId task00 = new TaskId(0, 0);
        TaskId task01 = new TaskId(0, 1);
        TaskId task02 = new TaskId(0, 2);
        TaskId task10 = new TaskId(1, 0);
        TaskId task11 = new TaskId(1, 1);
        TaskId task12 = new TaskId(1, 2);
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(builder, config, (KafkaClientSupplier)new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), this.userEndPoint).encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), this.userEndPoint).encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), this.userEndPoint).encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions().size());
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions().size());
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions().size());
        Assert.assertEquals((long)2L, (long)AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get((Object)"consumer10")).userData()).activeTasks.size());
        Assert.assertEquals((long)2L, (long)AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get((Object)"consumer11")).userData()).activeTasks.size());
        Assert.assertEquals((long)2L, (long)AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get((Object)"consumer20")).userData()).activeTasks.size());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task00, task01, task02}), (Object)partitionAssignor.tasksForState("store1"));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task10, task11, task12}), (Object)partitionAssignor.tasksForState("store2"));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task10, task11, task12}), (Object)partitionAssignor.tasksForState("store3"));
    }

    @Test
    public void testAssignWithStandbyReplicas() throws Exception {
        Properties props = this.configProps();
        props.setProperty("num.standby.replicas", "1");
        StreamsConfig config = new StreamsConfig((Map)props);
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(builder, config, (KafkaClientSupplier)new MockClientSupplier(), "test", client1, uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, this.userEndPoint).encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, this.userEndPoint).encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, this.userEndPoint).encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        HashSet allActiveTasks = new HashSet();
        HashSet allStandbyTasks = new HashSet();
        AssignmentInfo info10 = this.checkAssignment((PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        allStandbyTasks.addAll(info10.standbyTasks.keySet());
        AssignmentInfo info11 = this.checkAssignment((PartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        allStandbyTasks.addAll(info11.standbyTasks.keySet());
        Assert.assertNotEquals((String)"same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1}), new HashSet(allActiveTasks));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task2}), new HashSet(allStandbyTasks));
        AssignmentInfo info20 = this.checkAssignment((PartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        allStandbyTasks.addAll(info20.standbyTasks.keySet());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((long)3L, (long)allStandbyTasks.size());
        Assert.assertEquals((Object)allTasks, allStandbyTasks);
    }

    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData());
        Assert.assertEquals((long)assignment.partitions().size(), (long)info.activeTasks.size());
        ArrayList<TaskId> activeTasks = new ArrayList<TaskId>();
        HashSet<String> activeTopics = new HashSet<String>();
        for (TopicPartition partition : assignment.partitions()) {
            activeTasks.add(new TaskId(0, partition.partition()));
            activeTopics.add(partition.topic());
        }
        Assert.assertEquals(activeTasks, (Object)info.activeTasks);
        Assert.assertEquals(this.allTopics, activeTopics);
        HashSet<String> standbyTopics = new HashSet<String>();
        for (Map.Entry entry : info.standbyTasks.entrySet()) {
            TaskId id = (TaskId)entry.getKey();
            Set partitions = (Set)entry.getValue();
            for (TopicPartition partition : partitions) {
                Assert.assertEquals((long)id.partition, (long)partition.partition());
                standbyTopics.add(partition.topic());
            }
        }
        if (info.standbyTasks.size() > 0) {
            Assert.assertEquals(this.allTopics, standbyTopics);
        }
        return info;
    }

    @Test
    public void testOnAssignment() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        TopicPartition t2p3 = new TopicPartition("topic2", 3);
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        UUID uuid = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread = new StreamThread(builder, config, (KafkaClientSupplier)new MockClientSupplier(), "test", client1, uuid, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
        List activeTaskList = Utils.mkList((Object[])new TaskId[]{this.task0, this.task3});
        HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        standbyTasks.put(this.task1, Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}));
        standbyTasks.put(this.task2, Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t2", 0)}));
        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap());
        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList((Object[])new TopicPartition[]{this.t1p0, t2p3}), info.encode());
        partitionAssignor.onAssignment(assignment);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0}), (Object)partitionAssignor.tasksForPartition(this.t1p0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task3}), (Object)partitionAssignor.tasksForPartition(t2p3));
        Assert.assertEquals(standbyTasks, (Object)partitionAssignor.standbyTasks());
    }

    @Test
    public void testAssignWithInternalTopics() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        String applicationId = "test";
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId(applicationId);
        builder.addInternalTopic("topicX");
        builder.addSource("source1", new String[]{"topic1"});
        builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        builder.addSink("sink1", "topicX", new String[]{"processor1"});
        builder.addSource("source2", new String[]{"topicX"});
        builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "test-topicX"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread thread10 = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, applicationId, client1, uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
        partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, this.userEndPoint).encode()));
        partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)1L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        String applicationId = "test";
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId(applicationId);
        builder.addInternalTopic("topicX");
        builder.addSource("source1", new String[]{"topic1"});
        builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        builder.addSink("sink1", "topicX", new String[]{"processor1"});
        builder.addSource("source2", new String[]{"topicX"});
        builder.addInternalTopic("topicZ");
        builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        builder.addSink("sink2", "topicZ", new String[]{"processor2"});
        builder.addSource("source3", new String[]{"topicZ"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "test-topicX", "test-topicZ"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread thread10 = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, applicationId, client1, uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
        partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, this.userEndPoint).encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicZ").intValue());
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
        Properties properties = this.configProps();
        properties.put("application.server", "localhost:8080");
        StreamsConfig config = new StreamsConfig((Map)properties);
        TopologyBuilder builder = new TopologyBuilder();
        String applicationId = "application-id";
        builder.setApplicationId("application-id");
        builder.addSource("source", new String[]{"input"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addSink("sink", "output", new String[]{"processor"});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread streamThread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "application-id", "client1", uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"input"}));
        SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
        Assert.assertEquals((Object)"localhost:8080", (Object)subscriptionInfo.userEndPoint);
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
        Properties properties = this.configProps();
        String myEndPoint = "localhost:8080";
        properties.put("application.server", "localhost:8080");
        StreamsConfig config = new StreamsConfig((Map)properties);
        TopologyBuilder builder = new TopologyBuilder();
        String applicationId = "application-id";
        builder.setApplicationId("application-id");
        builder.addSource("source", new String[]{"topic1"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addSink("sink", "output", new String[]{"processor"});
        List topics = Utils.mkList((Object[])new String[]{"topic1"});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread streamThread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "application-id", "client1", uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumerAssignment = (PartitionAssignor.Assignment)assignments.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumerAssignment.userData());
        Set topicPartitions = (Set)assignmentInfo.partitionsByHostState.get(new HostInfo("localhost", 8080));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Object)topicPartitions);
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
        Properties properties = this.configProps();
        String myEndPoint = "localhost";
        properties.put("application.server", "localhost");
        StreamsConfig config = new StreamsConfig((Map)properties);
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        TopologyBuilder builder = new TopologyBuilder();
        String applicationId = "application-id";
        builder.setApplicationId("application-id");
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread streamThread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "application-id", "client1", uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        try {
            partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException e) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
        Properties properties = this.configProps();
        String myEndPoint = "localhost:j87yhk";
        properties.put("application.server", "localhost:j87yhk");
        StreamsConfig config = new StreamsConfig((Map)properties);
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        TopologyBuilder builder = new TopologyBuilder();
        String applicationId = "application-id";
        builder.setApplicationId("application-id");
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread streamThread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "application-id", "client1", uuid1, new Metrics(), (Time)new SystemTime(), new StreamsMetadataState(builder));
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        try {
            partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException e) {
            // empty catch block
        }
    }

    @Test
    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), hostState);
        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
        Assert.assertEquals(hostState, (Object)partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldSetClusterMetadataOnAssignment() throws Exception {
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), hostState);
        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
        Cluster cluster = partitionAssignor.clusterMetadata();
        List partitionInfos = cluster.partitionsForTopic("topic");
        PartitionInfo partitionInfo = (PartitionInfo)partitionInfos.get(0);
        Assert.assertEquals((long)1L, (long)partitionInfos.size());
        Assert.assertEquals((Object)"topic", (Object)partitionInfo.topic());
        Assert.assertEquals((long)0L, (long)partitionInfo.partition());
    }

    @Test
    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        Cluster cluster = partitionAssignor.clusterMetadata();
        Assert.assertNotNull((Object)cluster);
    }

    private class MockInternalTopicManager
    extends InternalTopicManager {
        public Map<String, Integer> readyTopics = new HashMap<String, Integer>();
        public MockConsumer<byte[], byte[]> restoreConsumer;

        public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
            this.restoreConsumer = restoreConsumer;
        }

        public void makeReady(InternalTopicConfig topic, int numPartitions) {
            this.readyTopics.put(topic.name(), numPartitions);
            ArrayList<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
            for (int i = 0; i < numPartitions; ++i) {
                partitions.add(new PartitionInfo(topic.name(), i, null, null, null));
            }
            this.restoreConsumer.updatePartitions(topic.name(), partitions);
        }
    }
}

