/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class TopologyTest {
    private final StoreBuilder storeBuilder = (StoreBuilder)EasyMock.createNiceMock(StoreBuilder.class);
    private final KeyValueStoreBuilder globalStoreBuilder = (KeyValueStoreBuilder)EasyMock.createNiceMock(KeyValueStoreBuilder.class);
    private final Topology topology = new Topology();
    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
        this.topology.addSource((String)null, new String[]{"topic"});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
        this.topology.addSource(null, Pattern.compile(".*"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicsWhenAddingSoureWithTopic() {
        this.topology.addSource("source", (String[])null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
        this.topology.addSource("source", (Pattern)null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowZeroTopicsWhenAddingSource() {
        this.topology.addSource("source", new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        this.topology.addProcessor(null, new ProcessorSupplier(){

            public Processor get() {
                return new MockProcessorSupplier().get();
            }
        }, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
        this.topology.addProcessor("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSink() {
        this.topology.addSink(null, "topic", new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicWhenAddingSink() {
        this.topology.addSink("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        this.topology.connectProcessorAndStateStores(null, new String[]{"store"});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
        this.topology.connectProcessorAndStateStores("processor", (String[])null);
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
        this.topology.connectProcessorAndStateStores("processor", new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullStateStoreSupplier() {
        this.topology.addStateStore(null, new String[0]);
    }

    @Test
    public void shouldNotAllowToAddSourcesWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source", new String[]{"topic-2"});
            Assert.fail((String)"Should throw TopologyException for duplicate source name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddTopicTwice() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source-2", new String[]{"topic-1"});
            Assert.fail((String)"Should throw TopologyException for already used topic");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        this.topology.addSource("source-1", new String[]{"foo"});
        try {
            this.topology.addSource("source-2", Pattern.compile("f.*"));
            Assert.fail((String)"Should have thrown TopologyException for overlapping pattern with already registered topic");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        this.topology.addSource("source-1", Pattern.compile("f.*"));
        try {
            this.topology.addSource("source-2", new String[]{"foo"});
            Assert.fail((String)"Should have thrown TopologyException for overlapping topic with already registered pattern");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
            Assert.fail((String)"Should throw TopologyException for duplicate processor name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyException.class)
    public void shouldFailOnUnknownSource() {
        this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
    }

    @Test(expected=TopologyException.class)
    public void shouldFailIfNodeIsItsOwnParent() {
        this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"processor"});
    }

    @Test
    public void shouldNotAllowToAddSinkWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-3", new String[]{"source"});
            Assert.fail((String)"Should throw TopologyException for duplicate sink name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyException.class)
    public void shouldFailWithUnknownParent() {
        this.topology.addSink("sink", "topic-2", new String[]{"source"});
    }

    @Test(expected=TopologyException.class)
    public void shouldFailIfSinkIsItsOwnParent() {
        this.topology.addSink("sink", "topic-2", new String[]{"sink"});
    }

    @Test
    public void shouldFailIfSinkIsParent() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink-1", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink-2", "topic-3", new String[]{"sink-1"});
            Assert.fail((String)"Should throw TopologyException for using sink as parent");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        this.mockStoreBuilder();
        EasyMock.replay((Object[])new Object[]{this.storeBuilder});
        this.topology.addStateStore(this.storeBuilder, new String[]{"no-such-processsor"});
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSource() {
        this.mockStoreBuilder();
        EasyMock.replay((Object[])new Object[]{this.storeBuilder});
        this.topology.addSource("source-1", new String[]{"topic-1"});
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"source-1"});
            Assert.fail((String)"Should have thrown TopologyException for adding store to source node");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSink() {
        this.mockStoreBuilder();
        EasyMock.replay((Object[])new Object[]{this.storeBuilder});
        this.topology.addSink("sink-1", "topic-1", new String[0]);
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"sink-1"});
            Assert.fail((String)"Should have thrown TopologyException for adding store to sink node");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    private void mockStoreBuilder() {
        EasyMock.expect((Object)this.storeBuilder.name()).andReturn((Object)"store").anyTimes();
        EasyMock.expect((Object)this.storeBuilder.logConfig()).andReturn(Collections.emptyMap());
        EasyMock.expect((Object)this.storeBuilder.loggingEnabled()).andReturn((Object)false);
    }

    @Test
    public void shouldNotAllowToAddStoreWithSameName() {
        this.mockStoreBuilder();
        EasyMock.replay((Object[])new Object[]{this.storeBuilder});
        this.topology.addStateStore(this.storeBuilder, new String[0]);
        try {
            this.topology.addStateStore(this.storeBuilder, new String[0]);
            Assert.fail((String)"Should have thrown TopologyException for duplicate store name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
        String sourceNodeName = "source";
        String goodNodeName = "goodGuy";
        String badNodeName = "badGuy";
        Properties config = new Properties();
        config.put("bootstrap.servers", "host:1");
        config.put("application.id", "appId");
        config.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        StreamsConfig streamsConfig = new StreamsConfig((Map)config);
        this.mockStoreBuilder();
        EasyMock.expect((Object)this.storeBuilder.build()).andReturn((Object)new MockStateStore("store", false));
        EasyMock.replay((Object[])new Object[]{this.storeBuilder});
        this.topology.addSource("source", new String[]{"topic"}).addProcessor("goodGuy", (ProcessorSupplier)new LocalMockProcessorSupplier(), new String[]{"source"}).addStateStore(this.storeBuilder, new String[]{"goodGuy"}).addProcessor("badGuy", (ProcessorSupplier)new LocalMockProcessorSupplier(), new String[]{"source"});
        try {
            new ProcessorTopologyTestDriver(streamsConfig, this.topology.internalTopologyBuilder);
        }
        catch (StreamsException e) {
            Throwable cause = e.getCause();
            if (cause != null && cause instanceof TopologyBuilderException && cause.getMessage().equals("Invalid topology building: Processor badGuy has no access to StateStore store")) {
                throw (TopologyBuilderException)cause;
            }
            throw new RuntimeException("Did expect different exception. Did catch:", e);
        }
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        EasyMock.expect((Object)this.globalStoreBuilder.name()).andReturn((Object)"anyName").anyTimes();
        EasyMock.replay((Object[])new Object[]{this.globalStoreBuilder});
        this.topology.addGlobalStore((StoreBuilder)this.globalStoreBuilder, "sameName", null, null, "anyTopicName", "sameName", new MockProcessorSupplier());
    }

    @Test
    public void shouldDescribeEmptyTopology() {
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void singleSourceShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, Collections.singleton(expectedSourceNode)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic1", "topic2", "topic3");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, Collections.singleton(expectedSourceNode)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void singleSourcePatternShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", Pattern.compile("topic[0-9]"));
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, Collections.singleton(expectedSourceNode)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void multipleSourcesShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic1");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, Collections.singleton(expectedSourceNode1)));
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(1, Collections.singleton(expectedSourceNode2)));
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(2, Collections.singleton(expectedSourceNode3)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void sourceAndProcessorShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode = this.addProcessor("processor", new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        String[] store = new String[]{"store"};
        TopologyDescription.Processor expectedProcessorNode = this.addProcessorWithNewStore("processor", store, new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        String[] stores = new String[]{"store1", "store2"};
        TopologyDescription.Processor expectedProcessorNode = this.addProcessorWithNewStore("processor", stores, new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessor("processor1", new TopologyDescription.Node[]{expectedSourceNode});
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessor("processor2", new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode1);
        allNodes.add(expectedProcessorNode2);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic0");
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", Pattern.compile("topic[1-9]"));
        TopologyDescription.Processor expectedProcessorNode = this.addProcessor("processor", new TopologyDescription.Node[]{expectedSourceNode1, expectedSourceNode2});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode1);
        allNodes.add(expectedSourceNode2);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic1");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessor("processor1", new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessor("processor2", new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Processor expectedProcessorNode3 = this.addProcessor("processor3", new TopologyDescription.Node[]{expectedSourceNode3});
        HashSet<Object> allNodes1 = new HashSet<Object>();
        allNodes1.add(expectedSourceNode1);
        allNodes1.add(expectedProcessorNode1);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes1));
        HashSet<Object> allNodes2 = new HashSet<Object>();
        allNodes2.add(expectedSourceNode2);
        allNodes2.add(expectedProcessorNode2);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(1, allNodes2));
        HashSet<Object> allNodes3 = new HashSet<Object>();
        allNodes3.add(expectedSourceNode3);
        allNodes3.add(expectedProcessorNode3);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(2, allNodes3));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic1");
        TopologyDescription.Sink expectedSinkNode1 = this.addSink("sink1", "sinkTopic1", new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Sink expectedSinkNode2 = this.addSink("sink2", "sinkTopic2", new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Sink expectedSinkNode3 = this.addSink("sink3", "sinkTopic3", new TopologyDescription.Node[]{expectedSourceNode3});
        HashSet<Object> allNodes1 = new HashSet<Object>();
        allNodes1.add(expectedSourceNode1);
        allNodes1.add(expectedSinkNode1);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes1));
        HashSet<Object> allNodes2 = new HashSet<Object>();
        allNodes2.add(expectedSourceNode2);
        allNodes2.add(expectedSinkNode2);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(1, allNodes2));
        HashSet<Object> allNodes3 = new HashSet<Object>();
        allNodes3.add(expectedSourceNode3);
        allNodes3.add(expectedSinkNode3);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(2, allNodes3));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void processorsWithSameSinkShouldHaveSameSubtopology() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessor("processor1", new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessor("processor2", new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Processor expectedProcessorNode3 = this.addProcessor("processor3", new TopologyDescription.Node[]{expectedSourceNode3});
        TopologyDescription.Sink expectedSinkNode = this.addSink("sink", "sinkTopic", new TopologyDescription.Node[]{expectedProcessorNode1, expectedProcessorNode2, expectedProcessorNode3});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode1);
        allNodes.add(expectedProcessorNode1);
        allNodes.add(expectedSourceNode2);
        allNodes.add(expectedProcessorNode2);
        allNodes.add(expectedSourceNode3);
        allNodes.add(expectedProcessorNode3);
        allNodes.add(expectedSinkNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void processorsWithSharedStateShouldHaveSameSubtopology() {
        String[] store1 = new String[]{"store1"};
        String[] store2 = new String[]{"store2"};
        String[] bothStores = new String[]{store1[0], store2[0]};
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessorWithNewStore("processor1", store1, new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessorWithNewStore("processor2", store2, new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Processor expectedProcessorNode3 = this.addProcessorWithExistingStore("processor3", bothStores, new TopologyDescription.Node[]{expectedSourceNode3});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode1);
        allNodes.add(expectedProcessorNode1);
        allNodes.add(expectedSourceNode2);
        allNodes.add(expectedProcessorNode2);
        allNodes.add(expectedSourceNode3);
        allNodes.add(expectedProcessorNode3);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.Subtopology(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void shouldDescribeGlobalStoreTopology() {
        this.addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0);
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void shouldDescribeMultipleGlobalStoreTopology() {
        this.addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0);
        this.addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1);
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    private TopologyDescription.Source addSource(String sourceName, String ... sourceTopic) {
        this.topology.addSource(null, sourceName, null, null, null, sourceTopic);
        String allSourceTopics = sourceTopic[0];
        for (int i = 1; i < sourceTopic.length; ++i) {
            allSourceTopics = allSourceTopics + ", " + sourceTopic[i];
        }
        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
    }

    private TopologyDescription.Source addSource(String sourceName, Pattern sourcePattern) {
        this.topology.addSource(null, sourceName, null, null, null, sourcePattern);
        return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
    }

    private TopologyDescription.Processor addProcessor(String processorName, TopologyDescription.Node ... parents) {
        return this.addProcessorWithNewStore(processorName, new String[0], parents);
    }

    private TopologyDescription.Processor addProcessorWithNewStore(String processorName, String[] storeNames, TopologyDescription.Node ... parents) {
        return this.addProcessorWithStore(processorName, storeNames, true, parents);
    }

    private TopologyDescription.Processor addProcessorWithExistingStore(String processorName, String[] storeNames, TopologyDescription.Node ... parents) {
        return this.addProcessorWithStore(processorName, storeNames, false, parents);
    }

    private TopologyDescription.Processor addProcessorWithStore(String processorName, String[] storeNames, boolean newStores, TopologyDescription.Node ... parents) {
        String[] parentNames = new String[parents.length];
        for (int i = 0; i < parents.length; ++i) {
            parentNames[i] = parents[i].name();
        }
        this.topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
        if (newStores) {
            for (String store : storeNames) {
                StoreBuilder storeBuilder = (StoreBuilder)EasyMock.createNiceMock(StoreBuilder.class);
                EasyMock.expect((Object)storeBuilder.name()).andReturn((Object)store).anyTimes();
                EasyMock.replay((Object[])new Object[]{storeBuilder});
                this.topology.addStateStore(storeBuilder, new String[]{processorName});
            }
        } else {
            this.topology.connectProcessorAndStateStores(processorName, storeNames);
        }
        InternalTopologyBuilder.Processor expectedProcessorNode = new InternalTopologyBuilder.Processor(processorName, new HashSet<String>(Arrays.asList(storeNames)));
        for (TopologyDescription.Node parent : parents) {
            ((InternalTopologyBuilder.AbstractNode)parent).addSuccessor((TopologyDescription.Node)expectedProcessorNode);
            ((InternalTopologyBuilder.AbstractNode)expectedProcessorNode).addPredecessor(parent);
        }
        return expectedProcessorNode;
    }

    private TopologyDescription.Sink addSink(String sinkName, String sinkTopic, TopologyDescription.Node ... parents) {
        String[] parentNames = new String[parents.length];
        for (int i = 0; i < parents.length; ++i) {
            parentNames[i] = parents[i].name();
        }
        this.topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
        InternalTopologyBuilder.Sink expectedSinkNode = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
        for (TopologyDescription.Node parent : parents) {
            ((InternalTopologyBuilder.AbstractNode)parent).addSuccessor((TopologyDescription.Node)expectedSinkNode);
            ((InternalTopologyBuilder.AbstractNode)expectedSinkNode).addPredecessor(parent);
        }
        return expectedSinkNode;
    }

    private void addGlobalStoreToTopologyAndExpectedDescription(String globalStoreName, String sourceName, String globalTopicName, String processorName, int id) {
        KeyValueStoreBuilder globalStoreBuilder = (KeyValueStoreBuilder)EasyMock.createNiceMock(KeyValueStoreBuilder.class);
        EasyMock.expect((Object)globalStoreBuilder.name()).andReturn((Object)globalStoreName).anyTimes();
        EasyMock.replay((Object[])new Object[]{globalStoreBuilder});
        this.topology.addGlobalStore((StoreBuilder)globalStoreBuilder, sourceName, null, null, null, globalTopicName, processorName, new MockProcessorSupplier());
        InternalTopologyBuilder.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(sourceName, processorName, globalStoreName, globalTopicName, id);
        this.expectedDescription.addGlobalStore((TopologyDescription.GlobalStore)expectedGlobalStore);
    }

    private static class LocalMockProcessorSupplier
    implements ProcessorSupplier {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor get() {
            return new Processor(){

                public void init(ProcessorContext context) {
                    context.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Object key, Object value) {
                }

                public void punctuate(long timestamp) {
                }

                public void close() {
                }
            };
        }
    }
}

