package org.apache.kafka.streams.processor;

import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/TopologyBuilderTest.class */
public class TopologyBuilderTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/TopologyBuilderTest$LocalMockProcessorSupplier.class */
    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor get() {
            return new Processor() { // from class: org.apache.kafka.streams.processor.TopologyBuilderTest.LocalMockProcessorSupplier.1
                public void init(ProcessorContext processorContext) {
                    processorContext.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Object obj, Object obj2) {
                }

                public void punctuate(long j) {
                }

                public void close() {
                }
            };
        }
    }

    @Test
    public void shouldAddSourceWithOffsetReset() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", new String[]{"earliestTopic"});
        topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", new String[]{"latestTopic"});
        Assert.assertTrue(topologyBuilder.earliestResetTopicsPattern().matcher("earliestTopic").matches());
        Assert.assertTrue(topologyBuilder.latestResetTopicsPattern().matcher("latestTopic").matches());
    }

    @Test
    public void shouldAddSourcePatternWithOffsetReset() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", Pattern.compile("earliest.*Topic"));
        topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", Pattern.compile("latest.*Topic"));
        Assert.assertTrue(topologyBuilder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
        Assert.assertTrue(topologyBuilder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
    }

    @Test
    public void shouldAddSourceWithoutOffsetReset() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Serde String = Serdes.String();
        Pattern compile = Pattern.compile("test-topic");
        topologyBuilder.addSource("source", String.deserializer(), String.deserializer(), new String[]{"test-topic"});
        Assert.assertEquals(compile.pattern(), topologyBuilder.sourceTopicPattern().pattern());
        Assert.assertEquals(topologyBuilder.earliestResetTopicsPattern().pattern(), "");
        Assert.assertEquals(topologyBuilder.latestResetTopicsPattern().pattern(), "");
    }

    @Test
    public void shouldAddPatternSourceWithoutOffsetReset() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Serde String = Serdes.String();
        Pattern compile = Pattern.compile("test-.*");
        topologyBuilder.addSource("source", String.deserializer(), String.deserializer(), Pattern.compile("test-.*"));
        Assert.assertEquals(compile.pattern(), topologyBuilder.sourceTopicPattern().pattern());
        Assert.assertEquals(topologyBuilder.earliestResetTopicsPattern().pattern(), "");
        Assert.assertEquals(topologyBuilder.latestResetTopicsPattern().pattern(), "");
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Serde String = Serdes.String();
        try {
            topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", (TimestampExtractor) null, String.deserializer(), String.deserializer(), new String[0]);
            Assert.fail("Should throw TopologyBuilderException with no topics");
        } catch (TopologyBuilderException e) {
        }
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Serde String = Serdes.String();
        topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", (TimestampExtractor) null, String.deserializer(), String.deserializer(), new String[]{"topic-1"});
        try {
            topologyBuilder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", (TimestampExtractor) null, String.deserializer(), String.deserializer(), new String[]{"topic-2"});
            Assert.fail("Should throw TopologyBuilderException for duplicate source name");
        } catch (TopologyBuilderException e) {
        }
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddSourceWithSameName() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"topic-1"});
        topologyBuilder.addSource("source", new String[]{"topic-2"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddSourceWithSameTopic() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"topic-1"});
        topologyBuilder.addSource("source-2", new String[]{"topic-1"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddProcessorWithSameName() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"topic-1"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddProcessorWithWrongParent() {
        new TopologyBuilder().addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddProcessorWithSelfParent() {
        new TopologyBuilder().addProcessor("processor", new MockProcessorSupplier(), new String[]{"processor"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddSinkWithSameName() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"topic-1"});
        topologyBuilder.addSink("sink", "topic-2", new String[]{"source"});
        topologyBuilder.addSink("sink", "topic-3", new String[]{"source"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddSinkWithWrongParent() {
        new TopologyBuilder().addSink("sink", "topic-2", new String[]{"source"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddSinkWithSelfParent() {
        new TopologyBuilder().addSink("sink", "topic-2", new String[]{"sink"});
    }

    @Test
    public void testAddSinkConnectedWithParent() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"source-topic"});
        topologyBuilder.addSink("sink", "dest-topic", new String[]{"source"});
        Set set = (Set) topologyBuilder.nodeGroups().get(0);
        Assert.assertTrue(set.contains("sink"));
        Assert.assertTrue(set.contains("source"));
    }

    @Test
    public void testAddSinkConnectedWithMultipleParent() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"source-topic"});
        topologyBuilder.addSource("sourceII", new String[]{"source-topicII"});
        topologyBuilder.addSink("sink", "dest-topic", new String[]{"source", "sourceII"});
        Set set = (Set) topologyBuilder.nodeGroups().get(0);
        Assert.assertTrue(set.contains("sink"));
        Assert.assertTrue(set.contains("source"));
        Assert.assertTrue(set.contains("sourceII"));
    }

    @Test
    public void testSourceTopics() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("X");
        topologyBuilder.addSource("source-1", new String[]{"topic-1"});
        topologyBuilder.addSource("source-2", new String[]{"topic-2"});
        topologyBuilder.addSource("source-3", new String[]{"topic-3"});
        topologyBuilder.addInternalTopic("topic-3");
        Assert.assertEquals(Pattern.compile("X-topic-3|topic-1|topic-2").pattern(), topologyBuilder.sourceTopicPattern().pattern());
    }

    @Test
    public void testPatternSourceTopic() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Pattern compile = Pattern.compile("topic-\\d");
        topologyBuilder.addSource("source-1", compile);
        Assert.assertEquals(compile.pattern(), topologyBuilder.sourceTopicPattern().pattern());
    }

    @Test
    public void testAddMoreThanOnePatternSourceNode() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Pattern compile = Pattern.compile("topics[A-Z]|.*-\\d");
        topologyBuilder.addSource("source-1", Pattern.compile("topics[A-Z]"));
        topologyBuilder.addSource("source-2", Pattern.compile(".*-\\d"));
        Assert.assertEquals(compile.pattern(), topologyBuilder.sourceTopicPattern().pattern());
    }

    @Test
    public void testSubscribeTopicNameAndPattern() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Pattern compile = Pattern.compile("topic-bar|topic-foo|.*-\\d");
        topologyBuilder.addSource("source-1", new String[]{"topic-foo", "topic-bar"});
        topologyBuilder.addSource("source-2", Pattern.compile(".*-\\d"));
        Assert.assertEquals(compile.pattern(), topologyBuilder.sourceTopicPattern().pattern());
    }

    @Test(expected = TopologyBuilderException.class)
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source-1", new String[]{"foo"});
        topologyBuilder.addSource("source-2", Pattern.compile("f.*"));
    }

    @Test(expected = TopologyBuilderException.class)
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source-1", Pattern.compile("f.*"));
        topologyBuilder.addSource("source-2", new String[]{"foo"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddStateStoreWithNonExistingProcessor() {
        new TopologyBuilder().addStateStore(new MockStateStoreSupplier("store", false), new String[]{"no-such-processsor"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddStateStoreWithSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source-1", new String[]{"topic-1"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[]{"source-1"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddStateStoreWithSink() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSink("sink-1", "topic-1", new String[0]);
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[]{"sink-1"});
    }

    @Test(expected = TopologyBuilderException.class)
    public void testAddStateStoreWithDuplicates() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[0]);
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[0]);
    }

    @Test
    public void testAddStateStore() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        MockStateStoreSupplier mockStateStoreSupplier = new MockStateStoreSupplier("store-1", false);
        topologyBuilder.addStateStore(mockStateStoreSupplier, new String[0]);
        topologyBuilder.setApplicationId("X");
        topologyBuilder.addSource("source-1", new String[]{"topic-1"});
        topologyBuilder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        Assert.assertEquals(0L, topologyBuilder.build((Integer) null).stateStores().size());
        topologyBuilder.connectProcessorAndStateStores("processor-1", new String[]{"store-1"});
        List stateStores = topologyBuilder.build((Integer) null).stateStores();
        Assert.assertEquals(1L, stateStores.size());
        Assert.assertEquals(mockStateStoreSupplier.name(), ((StateStore) stateStores.get(0)).name());
    }

    @Test
    public void testTopicGroups() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("X");
        topologyBuilder.addInternalTopic("topic-1x");
        topologyBuilder.addSource("source-1", new String[]{"topic-1", "topic-1x"});
        topologyBuilder.addSource("source-2", new String[]{"topic-2"});
        topologyBuilder.addSource("source-3", new String[]{"topic-3"});
        topologyBuilder.addSource("source-4", new String[]{"topic-4"});
        topologyBuilder.addSource("source-5", new String[]{"topic-5"});
        topologyBuilder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        topologyBuilder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-2", "processor-1"});
        topologyBuilder.copartitionSources(Utils.mkList(new String[]{"source-1", "source-2"}));
        topologyBuilder.addProcessor("processor-3", new MockProcessorSupplier(), new String[]{"source-3", "source-4"});
        Map map = topologyBuilder.topicGroups();
        HashMap hashMap = new HashMap();
        hashMap.put(0, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-1", "X-topic-1x", "topic-2"}), Collections.emptyMap(), Collections.emptyMap()));
        hashMap.put(1, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-3", "topic-4"}), Collections.emptyMap(), Collections.emptyMap()));
        hashMap.put(2, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-5"}), Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals(3L, map.size());
        Assert.assertEquals(hashMap, map);
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new String[]{"topic-1", "X-topic-1x", "topic-2"})}), new HashSet(topologyBuilder.copartitionGroups()));
    }

    @Test
    public void testTopicGroupsByStateStore() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("X");
        topologyBuilder.addSource("source-1", new String[]{"topic-1", "topic-1x"});
        topologyBuilder.addSource("source-2", new String[]{"topic-2"});
        topologyBuilder.addSource("source-3", new String[]{"topic-3"});
        topologyBuilder.addSource("source-4", new String[]{"topic-4"});
        topologyBuilder.addSource("source-5", new String[]{"topic-5"});
        topologyBuilder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        topologyBuilder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-2"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store-1", false), new String[]{"processor-1", "processor-2"});
        topologyBuilder.addProcessor("processor-3", new MockProcessorSupplier(), new String[]{"source-3"});
        topologyBuilder.addProcessor("processor-4", new MockProcessorSupplier(), new String[]{"source-4"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store-2", false), new String[]{"processor-3", "processor-4"});
        topologyBuilder.addProcessor("processor-5", new MockProcessorSupplier(), new String[]{"source-5"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store-3", false), new String[0]);
        topologyBuilder.connectProcessorAndStateStores("processor-5", new String[]{"store-3"});
        Map map = topologyBuilder.topicGroups();
        HashMap hashMap = new HashMap();
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("X", "store-1");
        String storeChangelogTopic2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
        String storeChangelogTopic3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
        hashMap.put(0, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-1", "topic-1x", "topic-2"}), Collections.emptyMap(), Collections.singletonMap(storeChangelogTopic, new UnwindowedChangelogTopicConfig(storeChangelogTopic, Collections.emptyMap()))));
        hashMap.put(1, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-3", "topic-4"}), Collections.emptyMap(), Collections.singletonMap(storeChangelogTopic2, new UnwindowedChangelogTopicConfig(storeChangelogTopic2, Collections.emptyMap()))));
        hashMap.put(2, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-5"}), Collections.emptyMap(), Collections.singletonMap(storeChangelogTopic3, new UnwindowedChangelogTopicConfig(storeChangelogTopic3, Collections.emptyMap()))));
        Assert.assertEquals(3L, map.size());
        Assert.assertEquals(hashMap, map);
    }

    @Test
    public void testBuild() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source-1", new String[]{"topic-1", "topic-1x"});
        topologyBuilder.addSource("source-2", new String[]{"topic-2"});
        topologyBuilder.addSource("source-3", new String[]{"topic-3"});
        topologyBuilder.addSource("source-4", new String[]{"topic-4"});
        topologyBuilder.addSource("source-5", new String[]{"topic-5"});
        topologyBuilder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        topologyBuilder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-2", "processor-1"});
        topologyBuilder.addProcessor("processor-3", new MockProcessorSupplier(), new String[]{"source-3", "source-4"});
        topologyBuilder.setApplicationId("X");
        ProcessorTopology build = topologyBuilder.build(0);
        ProcessorTopology build2 = topologyBuilder.build(1);
        ProcessorTopology build3 = topologyBuilder.build(2);
        Assert.assertEquals(Utils.mkSet(new String[]{"source-1", "source-2", "processor-1", "processor-2"}), nodeNames(build.processors()));
        Assert.assertEquals(Utils.mkSet(new String[]{"source-3", "source-4", "processor-3"}), nodeNames(build2.processors()));
        Assert.assertEquals(Utils.mkSet(new String[]{"source-5"}), nodeNames(build3.processors()));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSink() {
        new TopologyBuilder().addSink((String) null, "topic", new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicWhenAddingSink() {
        new TopologyBuilder().addSink("name", (String) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        new TopologyBuilder().addProcessor((String) null, new ProcessorSupplier() { // from class: org.apache.kafka.streams.processor.TopologyBuilderTest.1
            public Processor get() {
                return null;
            }
        }, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullProcessorSupplier() {
        new TopologyBuilder().addProcessor("name", (ProcessorSupplier) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSource() {
        new TopologyBuilder().addSource((String) null, Pattern.compile(".*"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        new TopologyBuilder().connectProcessorAndStateStores((String) null, new String[]{"store"});
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAddNullInternalTopic() {
        new TopologyBuilder().addInternalTopic((String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotSetApplicationIdToNull() {
        new TopologyBuilder().setApplicationId((String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAddNullStateStoreSupplier() {
        new TopologyBuilder().addStateStore((StateStoreSupplier) null, new String[0]);
    }

    private Set<String> nodeNames(Collection<ProcessorNode> collection) {
        HashSet hashSet = new HashSet();
        Iterator<ProcessorNode> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().name());
        }
        return hashSet;
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"topic"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[]{"processor"});
        Map stateStoreNameToSourceTopics = topologyBuilder.stateStoreNameToSourceTopics();
        Assert.assertEquals(1L, stateStoreNameToSourceTopics.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopics.get("store"));
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", new String[]{"topic"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[]{"processor"});
        Map stateStoreNameToSourceTopics = topologyBuilder.stateStoreNameToSourceTopics();
        Assert.assertEquals(1L, stateStoreNameToSourceTopics.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopics.get("store"));
    }

    @Test
    public void shouldCorrectlyMapStateStoreToInternalTopics() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("appId");
        topologyBuilder.addInternalTopic("internal-topic");
        topologyBuilder.addSource("source", new String[]{"internal-topic"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", false), new String[]{"processor"});
        Map stateStoreNameToSourceTopics = topologyBuilder.stateStoreNameToSourceTopics();
        Assert.assertEquals(1L, stateStoreNameToSourceTopics.size());
        Assert.assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopics.get("store"));
    }

    @Test
    public void shouldAddInternalTopicConfigForWindowStores() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("appId");
        topologyBuilder.addSource("source", new String[]{"topic"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        topologyBuilder.addStateStore(new RocksDBWindowStoreSupplier("store", IntegrationTestUtils.DEFAULT_TIMEOUT, 3, false, (Serde) null, (Serde) null, 10000L, true, Collections.emptyMap(), false), new String[]{"processor"});
        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) ((TopologyBuilder.TopicsInfo) topologyBuilder.topicGroups().values().iterator().next()).stateChangelogTopics.get("appId-store-changelog");
        Map properties = internalTopicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals(2L, properties.size());
        Assert.assertEquals("40000", properties.get("retention.ms"));
        Assert.assertEquals("appId-store-changelog", internalTopicConfig.name());
    }

    @Test
    public void shouldAddInternalTopicConfigForNonWindowStores() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("appId");
        topologyBuilder.addSource("source", new String[]{"topic"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        topologyBuilder.addStateStore(new MockStateStoreSupplier("store", true), new String[]{"processor"});
        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) ((TopologyBuilder.TopicsInfo) topologyBuilder.topicGroups().values().iterator().next()).stateChangelogTopics.get("appId-store-changelog");
        Assert.assertEquals(1L, internalTopicConfig.getProperties(Collections.emptyMap(), 10000L).size());
        Assert.assertEquals("appId-store-changelog", internalTopicConfig.name());
    }

    @Test
    public void shouldAddInternalTopicConfigForRepartitionTopics() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setApplicationId("appId");
        topologyBuilder.addInternalTopic("foo");
        topologyBuilder.addSource("source", new String[]{"foo"});
        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) ((TopologyBuilder.TopicsInfo) topologyBuilder.topicGroups().values().iterator().next()).repartitionSourceTopics.get("appId-foo");
        Assert.assertEquals(4L, internalTopicConfig.getProperties(Collections.emptyMap(), 10000L).size());
        Assert.assertEquals("appId-foo", internalTopicConfig.name());
    }

    @Test(expected = TopologyBuilderException.class)
    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "host:1");
        properties.put("application.id", "appId");
        properties.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        try {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.addSource("source", new String[]{"topic"}).addProcessor("goodGuy", new LocalMockProcessorSupplier(), new String[]{"source"}).addStateStore(Stores.create("store").withStringKeys().withStringValues().inMemory().build(), new String[]{"goodGuy"}).addProcessor("badGuy", new LocalMockProcessorSupplier(), new String[]{"source"});
            new ProcessorTopologyTestDriver(streamsConfig, topologyBuilder.internalTopologyBuilder).process("topic", null, null);
        } catch (StreamsException e) {
            TopologyBuilderException cause = e.getCause();
            if (cause == null || !(cause instanceof TopologyBuilderException) || !cause.getMessage().equals("Invalid topology building: Processor badGuy has no access to StateStore store")) {
                throw new RuntimeException("Did expect different exception. Did catch:", e);
            }
            throw cause;
        }
    }

    @Test
    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source-1", new String[]{"topic-foo"});
        topologyBuilder.addSource("source-2", Pattern.compile("topic-[A-C]"));
        topologyBuilder.addSource("source-3", Pattern.compile("topic-\\d"));
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field declaredField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        declaredField.setAccessible(true);
        Set set = (Set) declaredField.get(subscriptionUpdates);
        set.add("topic-B");
        set.add("topic-3");
        set.add("topic-A");
        topologyBuilder.updateSubscriptions(subscriptionUpdates, (String) null);
        topologyBuilder.setApplicationId("test-id");
        Map map = topologyBuilder.topicGroups();
        Assert.assertTrue(((TopologyBuilder.TopicsInfo) map.get(0)).sourceTopics.contains("topic-foo"));
        Assert.assertTrue(((TopologyBuilder.TopicsInfo) map.get(1)).sourceTopics.contains("topic-A"));
        Assert.assertTrue(((TopologyBuilder.TopicsInfo) map.get(1)).sourceTopics.contains("topic-B"));
        Assert.assertTrue(((TopologyBuilder.TopicsInfo) map.get(2)).sourceTopics.contains("topic-3"));
    }

    @Test
    public void shouldAddTimestampExtractorPerSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource(new MockTimestampExtractor(), "source", new String[]{"topic"});
        Assert.assertThat(topologyBuilder.build((Integer) null).source("topic").getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetPerSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource((TopologyBuilder.AutoOffsetReset) null, new MockTimestampExtractor(), "source", new String[]{"topic"});
        Assert.assertThat(topologyBuilder.build((Integer) null).source("topic").getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithPatternPerSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Pattern compile = Pattern.compile("t.*");
        topologyBuilder.addSource(new MockTimestampExtractor(), "source", compile);
        Assert.assertThat(topologyBuilder.build((Integer) null).source(compile.pattern()).getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Pattern compile = Pattern.compile("t.*");
        topologyBuilder.addSource((TopologyBuilder.AutoOffsetReset) null, new MockTimestampExtractor(), "source", compile);
        Assert.assertThat(topologyBuilder.build((Integer) null).source(compile.pattern()).getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource((TopologyBuilder.AutoOffsetReset) null, "source", new MockTimestampExtractor(), (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        Assert.assertThat(topologyBuilder.build((Integer) null).source("topic").getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Pattern compile = Pattern.compile("t.*");
        topologyBuilder.addSource((TopologyBuilder.AutoOffsetReset) null, "source", new MockTimestampExtractor(), (Deserializer) null, (Deserializer) null, compile);
        Assert.assertThat(topologyBuilder.build((Integer) null).source(compile.pattern()).getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
        TopologyBuilder addStateStore = new TopologyBuilder().addSource("ingest", Pattern.compile("topic-\\d+")).addProcessor("my-processor", new MockProcessorSupplier(), new String[]{"ingest"}).addStateStore(new MockStateStoreSupplier("testStateStore", false), new String[]{"my-processor"});
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field declaredField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        declaredField.setAccessible(true);
        Set set = (Set) declaredField.get(subscriptionUpdates);
        set.add("topic-2");
        set.add("topic-3");
        set.add("topic-A");
        addStateStore.updateSubscriptions(subscriptionUpdates, "test-thread");
        addStateStore.setApplicationId("test-app");
        List list = (List) addStateStore.stateStoreNameToSourceTopics().get("testStateStore");
        Assert.assertTrue("Expected to contain two topics", list.size() == 2);
        Assert.assertTrue(list.contains("topic-2"));
        Assert.assertTrue(list.contains("topic-3"));
        Assert.assertFalse(list.contains("topic-A"));
    }

    @Test(expected = TopologyBuilderException.class)
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        new TopologyBuilder().addGlobalStore(new MockStateStoreSupplier("anyName", false, false), "sameName", (Deserializer) null, (Deserializer) null, "anyTopicName", "sameName", new MockProcessorSupplier());
    }
}
