/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.junit.Assert;
import org.junit.Test;

public class TopologyBuilderTest {
    @Test(expected=TopologyBuilderException.class)
    public void testAddSourceWithSameName() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"topic-1"});
        builder.addSource("source", new String[]{"topic-2"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSourceWithSameTopic() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"topic-1"});
        builder.addSource("source-2", new String[]{"topic-1"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddProcessorWithSameName() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"topic-1"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddProcessorWithWrongParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddProcessorWithSelfParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"processor"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSinkWithSameName() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"topic-1"});
        builder.addSink("sink", "topic-2", new String[]{"source"});
        builder.addSink("sink", "topic-3", new String[]{"source"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSinkWithWrongParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("sink", "topic-2", new String[]{"source"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSinkWithSelfParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("sink", "topic-2", new String[]{"sink"});
    }

    @Test
    public void testAddSinkConnectedWithParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"source-topic"});
        builder.addSink("sink", "dest-topic", new String[]{"source"});
        Map nodeGroups = builder.nodeGroups();
        Set nodeGroup = (Set)nodeGroups.get(0);
        Assert.assertTrue((boolean)nodeGroup.contains("sink"));
        Assert.assertTrue((boolean)nodeGroup.contains("source"));
    }

    @Test
    public void testAddSinkConnectedWithMultipleParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"source-topic"});
        builder.addSource("sourceII", new String[]{"source-topicII"});
        builder.addSink("sink", "dest-topic", new String[]{"source", "sourceII"});
        Map nodeGroups = builder.nodeGroups();
        Set nodeGroup = (Set)nodeGroups.get(0);
        Assert.assertTrue((boolean)nodeGroup.contains("sink"));
        Assert.assertTrue((boolean)nodeGroup.contains("source"));
        Assert.assertTrue((boolean)nodeGroup.contains("sourceII"));
    }

    @Test
    public void testSourceTopics() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("X");
        builder.addSource("source-1", new String[]{"topic-1"});
        builder.addSource("source-2", new String[]{"topic-2"});
        builder.addSource("source-3", new String[]{"topic-3"});
        builder.addInternalTopic("topic-3");
        Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testPatternSourceTopic() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern expectedPattern = Pattern.compile("topic-\\d");
        builder.addSource("source-1", expectedPattern);
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testAddMoreThanOnePatternSourceNode() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
        builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
        builder.addSource("source-2", Pattern.compile(".*-\\d"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testSubscribeTopicNameAndPattern() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d");
        builder.addSource("source-1", new String[]{"topic-foo", "topic-bar"});
        builder.addSource("source-2", Pattern.compile(".*-\\d"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test(expected=TopologyBuilderException.class)
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", new String[]{"foo"});
        builder.addSource("source-2", Pattern.compile("f.*"));
    }

    @Test(expected=TopologyBuilderException.class)
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", Pattern.compile("f.*"));
        builder.addSource("source-2", new String[]{"foo"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithNonExistingProcessor() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[]{"no-such-processsor"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", new String[]{"topic-1"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[]{"source-1"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithSink() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("sink-1", "topic-1", new String[0]);
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[]{"sink-1"});
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithDuplicates() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[0]);
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[0]);
    }

    @Test
    public void testAddStateStore() {
        TopologyBuilder builder = new TopologyBuilder();
        MockStateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
        builder.addStateStore((StateStoreSupplier)supplier, new String[0]);
        builder.setApplicationId("X");
        builder.addSource("source-1", new String[]{"topic-1"});
        builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        Assert.assertEquals((long)0L, (long)builder.build(null).stateStores().size());
        builder.connectProcessorAndStateStores("processor-1", new String[]{"store-1"});
        List suppliers = builder.build(null).stateStores();
        Assert.assertEquals((long)1L, (long)suppliers.size());
        Assert.assertEquals((Object)supplier.name(), (Object)((StateStore)suppliers.get(0)).name());
    }

    @Test
    public void testTopicGroups() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("X");
        builder.addInternalTopic("topic-1x");
        builder.addSource("source-1", new String[]{"topic-1", "topic-1x"});
        builder.addSource("source-2", new String[]{"topic-2"});
        builder.addSource("source-3", new String[]{"topic-3"});
        builder.addSource("source-4", new String[]{"topic-4"});
        builder.addSource("source-5", new String[]{"topic-5"});
        builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-2", "processor-1"});
        builder.copartitionSources((Collection)Utils.mkList((Object[])new String[]{"source-1", "source-2"}));
        builder.addProcessor("processor-3", new MockProcessorSupplier(), new String[]{"source-3", "source-4"});
        Map topicGroups = builder.topicGroups();
        HashMap<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<Integer, TopologyBuilder.TopicsInfo>();
        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet((Object[])new String[]{"topic-1", "X-topic-1x", "topic-2"}), Collections.emptyMap(), Collections.emptyMap()));
        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet((Object[])new String[]{"topic-3", "topic-4"}), Collections.emptyMap(), Collections.emptyMap()));
        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet((Object[])new String[]{"topic-5"}), Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals((long)3L, (long)topicGroups.size());
        Assert.assertEquals(expectedTopicGroups, (Object)topicGroups);
        Collection copartitionGroups = builder.copartitionGroups();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new String[]{"topic-1", "X-topic-1x", "topic-2"})}), new HashSet(copartitionGroups));
    }

    @Test
    public void testTopicGroupsByStateStore() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("X");
        builder.addSource("source-1", new String[]{"topic-1", "topic-1x"});
        builder.addSource("source-2", new String[]{"topic-2"});
        builder.addSource("source-3", new String[]{"topic-3"});
        builder.addSource("source-4", new String[]{"topic-4"});
        builder.addSource("source-5", new String[]{"topic-5"});
        builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-2"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store-1", false), new String[]{"processor-1", "processor-2"});
        builder.addProcessor("processor-3", new MockProcessorSupplier(), new String[]{"source-3"});
        builder.addProcessor("processor-4", new MockProcessorSupplier(), new String[]{"source-4"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store-2", false), new String[]{"processor-3", "processor-4"});
        builder.addProcessor("processor-5", new MockProcessorSupplier(), new String[]{"source-5"});
        MockStateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
        builder.addStateStore((StateStoreSupplier)supplier, new String[0]);
        builder.connectProcessorAndStateStores("processor-5", new String[]{"store-3"});
        Map topicGroups = builder.topicGroups();
        HashMap<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<Integer, TopologyBuilder.TopicsInfo>();
        String store1 = ProcessorStateManager.storeChangelogTopic((String)"X", (String)"store-1");
        String store2 = ProcessorStateManager.storeChangelogTopic((String)"X", (String)"store-2");
        String store3 = ProcessorStateManager.storeChangelogTopic((String)"X", (String)"store-3");
        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet((Object[])new String[]{"topic-1", "topic-1x", "topic-2"}), Collections.emptyMap(), Collections.singletonMap(store1, new InternalTopicConfig(store1, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.emptyMap()))));
        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet((Object[])new String[]{"topic-3", "topic-4"}), Collections.emptyMap(), Collections.singletonMap(store2, new InternalTopicConfig(store2, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.emptyMap()))));
        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet((Object[])new String[]{"topic-5"}), Collections.emptyMap(), Collections.singletonMap(store3, new InternalTopicConfig(store3, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.emptyMap()))));
        Assert.assertEquals((long)3L, (long)topicGroups.size());
        Assert.assertEquals(expectedTopicGroups, (Object)topicGroups);
    }

    @Test
    public void testBuild() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", new String[]{"topic-1", "topic-1x"});
        builder.addSource("source-2", new String[]{"topic-2"});
        builder.addSource("source-3", new String[]{"topic-3"});
        builder.addSource("source-4", new String[]{"topic-4"});
        builder.addSource("source-5", new String[]{"topic-5"});
        builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-2", "processor-1"});
        builder.addProcessor("processor-3", new MockProcessorSupplier(), new String[]{"source-3", "source-4"});
        builder.setApplicationId("X");
        ProcessorTopology topology0 = builder.build(Integer.valueOf(0));
        ProcessorTopology topology1 = builder.build(Integer.valueOf(1));
        ProcessorTopology topology2 = builder.build(Integer.valueOf(2));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"source-1", "source-2", "processor-1", "processor-2"}), this.nodeNames(topology0.processors()));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"source-3", "source-4", "processor-3"}), this.nodeNames(topology1.processors()));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"source-5"}), this.nodeNames(topology2.processors()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink(null, "topic", new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor(null, new ProcessorSupplier(){

            public Processor get() {
                return null;
            }
        }, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorSupplier() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource(null, Pattern.compile(".*"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.connectProcessorAndStateStores(null, new String[]{"store"});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullInternalTopic() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addInternalTopic(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotSetApplicationIdToNull() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullStateStoreSupplier() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore(null, new String[0]);
    }

    private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
        HashSet<String> nodeNames = new HashSet<String>();
        for (ProcessorNode node : nodes) {
            nodeNames.add(node.name());
        }
        return nodeNames;
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"topic"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[]{"processor"});
        Map stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", new String[]{"topic"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[]{"processor"});
        Map stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addInternalTopic("internal-topic");
        builder.addSource("source", new String[]{"internal-topic"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store", false), new String[]{"processor"});
        Map stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addSource("source", new String[]{"topic"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addStateStore((StateStoreSupplier)new RocksDBWindowStoreSupplier("store", 30000L, 3, false, null, null, 10000L, true, Collections.emptyMap(), false), new String[]{"processor"});
        Map topicGroups = builder.topicGroups();
        TopologyBuilder.TopicsInfo topicsInfo = (TopologyBuilder.TopicsInfo)topicGroups.values().iterator().next();
        InternalTopicConfig topicConfig = (InternalTopicConfig)topicsInfo.stateChangelogTopics.get("appId-store-changelog");
        Properties properties = topicConfig.toProperties(0L);
        List<String> policies = Arrays.asList(properties.getProperty("cleanup.policy").split(","));
        Assert.assertEquals((Object)"appId-store-changelog", (Object)topicConfig.name());
        Assert.assertTrue((boolean)policies.contains("compact"));
        Assert.assertTrue((boolean)policies.contains("delete"));
        Assert.assertEquals((long)2L, (long)policies.size());
        Assert.assertEquals((Object)"30000", (Object)properties.getProperty("retention.ms"));
        Assert.assertEquals((long)2L, (long)properties.size());
    }

    @Test
    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addSource("source", new String[]{"topic"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("name", true), new String[]{"processor"});
        Map topicGroups = builder.topicGroups();
        TopologyBuilder.TopicsInfo topicsInfo = (TopologyBuilder.TopicsInfo)topicGroups.values().iterator().next();
        InternalTopicConfig topicConfig = (InternalTopicConfig)topicsInfo.stateChangelogTopics.get("appId-name-changelog");
        Properties properties = topicConfig.toProperties(0L);
        Assert.assertEquals((Object)"appId-name-changelog", (Object)topicConfig.name());
        Assert.assertEquals((Object)"compact", (Object)properties.getProperty("cleanup.policy"));
        Assert.assertEquals((long)1L, (long)properties.size());
    }

    @Test
    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addInternalTopic("foo");
        builder.addSource("source", new String[]{"foo"});
        TopologyBuilder.TopicsInfo topicsInfo = (TopologyBuilder.TopicsInfo)builder.topicGroups().values().iterator().next();
        InternalTopicConfig topicConfig = (InternalTopicConfig)topicsInfo.repartitionSourceTopics.get("appId-foo");
        Properties properties = topicConfig.toProperties(0L);
        Assert.assertEquals((Object)"appId-foo", (Object)topicConfig.name());
        Assert.assertEquals((Object)"delete", (Object)properties.getProperty("cleanup.policy"));
        Assert.assertEquals((long)1L, (long)properties.size());
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThroughOnUnassignedStateStoreAccess() {
        String sourceNodeName = "source";
        String goodNodeName = "goodGuy";
        String badNodeName = "badGuy";
        Properties config = new Properties();
        config.put("bootstrap.servers", "host:1");
        config.put("application.id", "appId");
        StreamsConfig streamsConfig = new StreamsConfig((Map)config);
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.addSource("source", new String[]{"topic"}).addProcessor("goodGuy", (ProcessorSupplier)new LocalMockProcessorSupplier(), new String[]{"source"}).addStateStore(Stores.create((String)"store").withStringKeys().withStringValues().inMemory().build(), new String[]{"goodGuy"}).addProcessor("badGuy", (ProcessorSupplier)new LocalMockProcessorSupplier(), new String[]{"source"});
            ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, "store");
            driver.process("topic", null, null);
        }
        catch (StreamsException e) {
            Throwable cause = e.getCause();
            if (cause != null && cause instanceof TopologyBuilderException && cause.getMessage().equals("Invalid topology building: Processor badGuy has no access to StateStore store")) {
                throw (TopologyBuilderException)cause;
            }
            throw new RuntimeException("Did expect different exception. Did catch:", e);
        }
    }

    @Test
    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", new String[]{"topic-foo"});
        builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
        builder.addSource("source-3", Pattern.compile("topic-\\d"));
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add("topic-B");
        updatedTopics.add("topic-3");
        updatedTopics.add("topic-A");
        builder.updateSubscriptions(subscriptionUpdates, null);
        builder.setApplicationId("test-id");
        Map topicGroups = builder.topicGroups();
        Assert.assertTrue((boolean)((TopologyBuilder.TopicsInfo)topicGroups.get((Object)Integer.valueOf((int)0))).sourceTopics.contains("topic-foo"));
        Assert.assertTrue((boolean)((TopologyBuilder.TopicsInfo)topicGroups.get((Object)Integer.valueOf((int)1))).sourceTopics.contains("topic-A"));
        Assert.assertTrue((boolean)((TopologyBuilder.TopicsInfo)topicGroups.get((Object)Integer.valueOf((int)1))).sourceTopics.contains("topic-B"));
        Assert.assertTrue((boolean)((TopologyBuilder.TopicsInfo)topicGroups.get((Object)Integer.valueOf((int)2))).sourceTopics.contains("topic-3"));
    }

    @Test
    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder().addSource("ingest", Pattern.compile("topic-\\d+")).addProcessor("my-processor", new MockProcessorSupplier(), new String[]{"ingest"}).addStateStore((StateStoreSupplier)new MockStateStoreSupplier("testStateStore", false), new String[]{"my-processor"});
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add("topic-2");
        updatedTopics.add("topic-3");
        updatedTopics.add("topic-A");
        topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
        topologyBuilder.setApplicationId("test-app");
        Map stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
        List topics = (List)stateStoreAndTopics.get("testStateStore");
        Assert.assertTrue((String)"Expected to contain two topics", (topics.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)topics.contains("topic-2"));
        Assert.assertTrue((boolean)topics.contains("topic-3"));
        Assert.assertFalse((boolean)topics.contains("topic-A"));
    }

    private static class LocalMockProcessorSupplier
    implements ProcessorSupplier {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor get() {
            return new Processor(){

                public void init(ProcessorContext context) {
                    context.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Object key, Object value) {
                }

                public void punctuate(long timestamp) {
                }

                public void close() {
                }
            };
        }
    }
}

