package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.class */
public class InternalTopologyBuilderTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final StoreBuilder<?> storeBuilder = new MockKeyValueStoreBuilder("testStore", false);

    @Test
    public void shouldAddSourceWithOffsetReset() {
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"earliestTopic"});
        this.builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"latestTopic"});
        this.builder.initializeSubscription();
        MatcherAssert.assertThat(this.builder.offsetResetStrategy("earliestTopic"), CoreMatchers.equalTo(OffsetResetStrategy.EARLIEST));
        MatcherAssert.assertThat(this.builder.offsetResetStrategy("latestTopic"), CoreMatchers.equalTo(OffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldAddSourcePatternWithOffsetReset() {
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("earliest.*Topic"));
        this.builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("latest.*Topic"));
        this.builder.initializeSubscription();
        MatcherAssert.assertThat(this.builder.offsetResetStrategy("earliestTestTopic"), CoreMatchers.equalTo(OffsetResetStrategy.EARLIEST));
        MatcherAssert.assertThat(this.builder.offsetResetStrategy("latestTestTopic"), CoreMatchers.equalTo(OffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldAddSourceWithoutOffsetReset() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), new String[]{"test-topic"});
        this.builder.initializeSubscription();
        Assert.assertEquals(Collections.singletonList("test-topic"), this.builder.fullSourceTopicNames());
        MatcherAssert.assertThat(this.builder.offsetResetStrategy("test-topic"), CoreMatchers.equalTo(OffsetResetStrategy.NONE));
    }

    @Test
    public void shouldAddPatternSourceWithoutOffsetReset() {
        Pattern compile = Pattern.compile("test-.*");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), Pattern.compile("test-.*"));
        this.builder.initializeSubscription();
        MatcherAssert.assertThat(compile.pattern(), this.builder.sourceTopicPatternString(), CoreMatchers.equalTo("test-.*"));
        MatcherAssert.assertThat(this.builder.offsetResetStrategy("test-topic"), CoreMatchers.equalTo(OffsetResetStrategy.NONE));
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", (TimestampExtractor) null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), new String[0]);
        });
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", (TimestampExtractor) null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), new String[]{"topic-1"});
        try {
            this.builder.addSource(Topology.AutoOffsetReset.LATEST, "source", (TimestampExtractor) null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), new String[]{"topic-2"});
            Assert.fail("Should throw TopologyException for duplicate source name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddSourceWithSameName() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        try {
            this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
            Assert.fail("Should throw TopologyException with source name conflict");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddSourceWithSameTopic() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        try {
            this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
            Assert.fail("Should throw TopologyException with topic conflict");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddProcessorWithSameName() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
            Assert.fail("Should throw TopologyException with processor name conflict");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddProcessorWithWrongParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        });
    }

    @Test
    public void testAddProcessorWithSelfParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"processor"});
        });
    }

    @Test
    public void testAddProcessorWithEmptyParents() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[0]);
        });
    }

    @Test
    public void testAddProcessorWithNullParents() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{(String) null});
        });
    }

    @Test
    public void testAddProcessorWithBadSupplier() {
        MockApiProcessor mockApiProcessor = new MockApiProcessor();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.builder.addProcessor("processor", () -> {
                return mockApiProcessor;
            }, new String[]{(String) null});
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void testAddGlobalStoreWithBadSupplier() {
        Processor m232get = new MockApiProcessorSupplier().m232get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "globalTopic", "global-processor", () -> {
                return m232get;
            });
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void testAddSinkWithSameName() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addSink("sink", "topic-2", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
        try {
            this.builder.addSink("sink", "topic-3", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
            Assert.fail("Should throw TopologyException with sink name conflict");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddSinkWithWrongParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addSink("sink", "topic-2", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
        });
    }

    @Test
    public void testAddSinkWithSelfParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addSink("sink", "topic-2", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"sink"});
        });
    }

    @Test
    public void testAddSinkWithEmptyParents() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addSink("sink", "topic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[0]);
        });
    }

    @Test
    public void testAddSinkWithNullParents() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addSink("sink", "topic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{(String) null});
        });
    }

    @Test
    public void testAddSinkConnectedWithParent() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"source-topic"});
        this.builder.addSink("sink", "dest-topic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
        Set set = (Set) this.builder.nodeGroups().get(0);
        Assert.assertTrue(set.contains("sink"));
        Assert.assertTrue(set.contains("source"));
    }

    @Test
    public void testAddSinkConnectedWithMultipleParent() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"source-topic"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "sourceII", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"source-topicII"});
        this.builder.addSink("sink", "dest-topic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source", "sourceII"});
        Set set = (Set) this.builder.nodeGroups().get(0);
        Assert.assertTrue(set.contains("sink"));
        Assert.assertTrue(set.contains("source"));
        Assert.assertTrue(set.contains("sourceII"));
    }

    @Test
    public void testOnlyTopicNameSourceTopics() {
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-3"});
        this.builder.addInternalTopic("topic-3", InternalTopicProperties.empty());
        this.builder.initializeSubscription();
        Assert.assertFalse(this.builder.usesPatternSubscription());
        Assert.assertEquals(Arrays.asList("X-topic-3", "topic-1", "topic-2"), this.builder.fullSourceTopicNames());
    }

    @Test
    public void testPatternAndNameSourceTopics() {
        Pattern compile = Pattern.compile("topic-4|topic-5");
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-3"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-4", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, compile);
        this.builder.addInternalTopic("topic-3", InternalTopicProperties.empty());
        this.builder.initializeSubscription();
        Assert.assertEquals(Pattern.compile("X-topic-3|topic-1|topic-2|topic-4|topic-5").pattern(), Pattern.compile(this.builder.sourceTopicPatternString()).pattern());
    }

    @Test
    public void testPatternSourceTopicsWithGlobalTopics() {
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("topic-1"));
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("topic-2"));
        this.builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "globalTopic", "global-processor", new MockApiProcessorSupplier());
        this.builder.initializeSubscription();
        Assert.assertEquals(Pattern.compile("topic-1|topic-2").pattern(), Pattern.compile(this.builder.sourceTopicPatternString()).pattern());
    }

    @Test
    public void testNameSourceTopicsWithGlobalTopics() {
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        this.builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "globalTopic", "global-processor", new MockApiProcessorSupplier());
        this.builder.initializeSubscription();
        MatcherAssert.assertThat(this.builder.fullSourceTopicNames(), CoreMatchers.equalTo(Arrays.asList("topic-1", "topic-2")));
    }

    @Test
    public void testPatternSourceTopic() {
        Pattern compile = Pattern.compile("topic-\\d");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, compile);
        this.builder.initializeSubscription();
        Assert.assertEquals(compile.pattern(), Pattern.compile(this.builder.sourceTopicPatternString()).pattern());
    }

    @Test
    public void testAddMoreThanOnePatternSourceNode() {
        Pattern compile = Pattern.compile("topics[A-Z]|.*-\\d");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("topics[A-Z]"));
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile(".*-\\d"));
        this.builder.initializeSubscription();
        Assert.assertEquals(compile.pattern(), Pattern.compile(this.builder.sourceTopicPatternString()).pattern());
    }

    @Test
    public void testSubscribeTopicNameAndPattern() {
        Pattern compile = Pattern.compile("topic-bar|topic-foo|.*-\\d");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-foo", "topic-bar"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile(".*-\\d"));
        this.builder.initializeSubscription();
        Assert.assertEquals(compile.pattern(), Pattern.compile(this.builder.sourceTopicPatternString()).pattern());
    }

    @Test
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"foo"});
        try {
            this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("f.*"));
            Assert.fail("Should throw TopologyException with topic name/pattern conflict");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("f.*"));
        try {
            this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"foo"});
            Assert.fail("Should throw TopologyException with topic name/pattern conflict");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddStateStoreWithNonExistingProcessor() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addStateStore(this.storeBuilder, new String[]{"no-such-processor"});
        });
    }

    @Test
    public void testAddStateStoreWithSource() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        try {
            this.builder.addStateStore(this.storeBuilder, new String[]{"source-1"});
            Assert.fail("Should throw TopologyException with store cannot be added to source");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testAddStateStoreWithSink() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addSink("sink-1", "topic-1", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source-1"});
        try {
            this.builder.addStateStore(this.storeBuilder, new String[]{"sink-1"});
            Assert.fail("Should throw TopologyException with store cannot be added to sink");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddStoresWithSameName() {
        MockKeyValueStoreBuilder mockKeyValueStoreBuilder = new MockKeyValueStoreBuilder("testStore", false);
        this.builder.addStateStore(this.storeBuilder, new String[0]);
        MatcherAssert.assertThat(Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addStateStore(mockKeyValueStoreBuilder, new String[0]);
        }).getMessage(), CoreMatchers.equalTo("Invalid topology: A different StateStore has already been added with the name testStore"));
    }

    @Test
    public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() {
        this.builder.addGlobalStore(new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(), "global-store", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "global-topic", "global-processor", new MockApiProcessorSupplier());
        MatcherAssert.assertThat(Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addStateStore(this.storeBuilder, new String[0]);
        }).getMessage(), CoreMatchers.equalTo("Invalid topology: A different GlobalStateStore has already been added with the name testStore"));
    }

    @Test
    public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() {
        StoreBuilder withLoggingDisabled = new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
        this.builder.addStateStore(this.storeBuilder, new String[0]);
        MatcherAssert.assertThat(Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addGlobalStore(withLoggingDisabled, "global-store", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "global-topic", "global-processor", new MockApiProcessorSupplier());
        }).getMessage(), CoreMatchers.equalTo("Invalid topology: A different StateStore has already been added with the name testStore"));
    }

    @Test
    public void shouldNotAllowToAddGlobalStoresWithSameName() {
        StoreBuilder withLoggingDisabled = new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
        StoreBuilder withLoggingDisabled2 = new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
        this.builder.addGlobalStore(withLoggingDisabled, "global-store", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "global-topic", "global-processor", new MockApiProcessorSupplier());
        MatcherAssert.assertThat(Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addGlobalStore(withLoggingDisabled2, "global-store-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "global-topic", "global-processor-2", new MockApiProcessorSupplier());
        }).getMessage(), CoreMatchers.equalTo("Invalid topology: A different GlobalStateStore has already been added with the name testStore"));
    }

    @Test
    public void testAddStateStore() {
        this.builder.addStateStore(this.storeBuilder, new String[0]);
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        Assert.assertEquals(0L, this.builder.buildTopology().stateStores().size());
        this.builder.connectProcessorAndStateStores("processor-1", new String[]{this.storeBuilder.name()});
        List stateStores = this.builder.buildTopology().stateStores();
        Assert.assertEquals(1L, stateStores.size());
        Assert.assertEquals(this.storeBuilder.name(), ((StateStore) stateStores.get(0)).name());
    }

    @Test
    public void shouldAllowAddingSameStoreBuilderMultipleTimes() {
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addStateStore(this.storeBuilder, new String[0]);
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.builder.connectProcessorAndStateStores("processor-1", new String[]{this.storeBuilder.name()});
        this.builder.addStateStore(this.storeBuilder, new String[0]);
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.builder.connectProcessorAndStateStores("processor-2", new String[]{this.storeBuilder.name()});
        Assert.assertEquals(1L, this.builder.buildTopology().stateStores().size());
    }

    @Test
    public void testTopicGroups() {
        this.builder.setApplicationId("X");
        this.builder.addInternalTopic("topic-1x", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1", "topic-1x"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-3"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-4", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-4"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-5", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-5"});
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-2", "processor-1"});
        this.builder.copartitionSources(Arrays.asList("source-1", "source-2"));
        this.builder.addProcessor("processor-3", new MockApiProcessorSupplier(), new String[]{"source-3", "source-4"});
        Map subtopologyToTopicsInfo = this.builder.subtopologyToTopicsInfo();
        HashMap hashMap = new HashMap();
        hashMap.put(AssignmentTestUtils.SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-1", "X-topic-1x", "topic-2"}), Collections.emptyMap(), Collections.emptyMap()));
        hashMap.put(AssignmentTestUtils.SUBTOPOLOGY_1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-3", "topic-4"}), Collections.emptyMap(), Collections.emptyMap()));
        hashMap.put(AssignmentTestUtils.SUBTOPOLOGY_2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-5"}), Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals(3L, subtopologyToTopicsInfo.size());
        Assert.assertEquals(hashMap, subtopologyToTopicsInfo);
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new String[]{"topic-1", "X-topic-1x", "topic-2"})}), new HashSet(this.builder.copartitionGroups()));
    }

    @Test
    public void testTopicGroupsByStateStore() {
        this.builder.setApplicationId("X");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1", "topic-1x"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-3"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-4", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-4"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-5", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-5"});
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-2"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store-1", false), new String[]{"processor-1", "processor-2"});
        this.builder.addProcessor("processor-3", new MockApiProcessorSupplier(), new String[]{"source-3"});
        this.builder.addProcessor("processor-4", new MockApiProcessorSupplier(), new String[]{"source-4"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store-2", false), new String[]{"processor-3", "processor-4"});
        this.builder.addProcessor("processor-5", new MockApiProcessorSupplier(), new String[]{"source-5"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store-3", false), new String[0]);
        this.builder.connectProcessorAndStateStores("processor-5", new String[]{"store-3"});
        this.builder.buildTopology();
        Map subtopologyToTopicsInfo = this.builder.subtopologyToTopicsInfo();
        HashMap hashMap = new HashMap();
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("X", "store-1", this.builder.topologyName());
        String storeChangelogTopic2 = ProcessorStateManager.storeChangelogTopic("X", "store-2", this.builder.topologyName());
        String storeChangelogTopic3 = ProcessorStateManager.storeChangelogTopic("X", "store-3", this.builder.topologyName());
        hashMap.put(AssignmentTestUtils.SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-1", "topic-1x", "topic-2"}), Collections.emptyMap(), Collections.singletonMap(storeChangelogTopic, new UnwindowedChangelogTopicConfig(storeChangelogTopic, Collections.emptyMap()))));
        hashMap.put(AssignmentTestUtils.SUBTOPOLOGY_1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-3", "topic-4"}), Collections.emptyMap(), Collections.singletonMap(storeChangelogTopic2, new UnwindowedChangelogTopicConfig(storeChangelogTopic2, Collections.emptyMap()))));
        hashMap.put(AssignmentTestUtils.SUBTOPOLOGY_2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet(new String[]{"topic-5"}), Collections.emptyMap(), Collections.singletonMap(storeChangelogTopic3, new UnwindowedChangelogTopicConfig(storeChangelogTopic3, Collections.emptyMap()))));
        Assert.assertEquals(3L, subtopologyToTopicsInfo.size());
        Assert.assertEquals(hashMap, subtopologyToTopicsInfo);
    }

    @Test
    public void testBuild() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1", "topic-1x"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-3"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-4", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-4"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-5", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-5"});
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-2", "processor-1"});
        this.builder.addProcessor("processor-3", new MockApiProcessorSupplier(), new String[]{"source-3", "source-4"});
        this.builder.setApplicationId("X");
        ProcessorTopology buildSubtopology = this.builder.buildSubtopology(0);
        ProcessorTopology buildSubtopology2 = this.builder.buildSubtopology(1);
        ProcessorTopology buildSubtopology3 = this.builder.buildSubtopology(2);
        Assert.assertEquals(Utils.mkSet(new String[]{"source-1", "source-2", "processor-1", "processor-2"}), nodeNames(buildSubtopology.processors()));
        Assert.assertEquals(Utils.mkSet(new String[]{"source-3", "source-4", "processor-3"}), nodeNames(buildSubtopology2.processors()));
        Assert.assertEquals(Utils.mkSet(new String[]{"source-5"}), nodeNames(buildSubtopology3.processors()));
    }

    @Test
    public void shouldAllowIncrementalBuilds() {
        Map nodeGroups = this.builder.nodeGroups();
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-2"});
        Map nodeGroups2 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups, nodeGroups2);
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile(""));
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-4", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile(""));
        Map nodeGroups3 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups2, nodeGroups3);
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-2"});
        this.builder.addProcessor("processor-3", new MockApiProcessorSupplier(), new String[]{"source-3"});
        Map nodeGroups4 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups3, nodeGroups4);
        this.builder.addSink("sink-1", "sink-topic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor-1"});
        Map nodeGroups5 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups4, nodeGroups5);
        this.builder.addSink("sink-2", (obj, obj2, recordContext) -> {
            return "sink-topic";
        }, (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor-2"});
        Map nodeGroups6 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups5, nodeGroups6);
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store-1", false), new String[]{"processor-1", "processor-2"});
        Map nodeGroups7 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups6, nodeGroups7);
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store-2", false), new String[0]);
        this.builder.connectProcessorAndStateStores("processor-2", new String[]{"store-2"});
        this.builder.connectProcessorAndStateStores("processor-3", new String[]{"store-2"});
        Map nodeGroups8 = this.builder.nodeGroups();
        Assert.assertNotEquals(nodeGroups7, nodeGroups8);
        this.builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "globalTopic", "global-processor", new MockApiProcessorSupplier());
        Assert.assertNotEquals(nodeGroups8, this.builder.nodeGroups());
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSink() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addSink((String) null, "topic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullTopicWhenAddingSink() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addSink("name", (String) null, (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addSink("name", (TopicNameExtractor) null, (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addProcessor((String) null, () -> {
                return null;
            }, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullProcessorSupplier() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addProcessor("name", (ProcessorSupplier) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSource() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addSource((Topology.AutoOffsetReset) null, (String) null, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile(".*"));
        });
    }

    @Test
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.connectProcessorAndStateStores((String) null, new String[]{"store"});
        });
    }

    @Test
    public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.connectProcessorAndStateStores("processor", new String[]{null});
        });
    }

    @Test
    public void shouldNotAddNullInternalTopic() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addInternalTopic((String) null, InternalTopicProperties.empty());
        });
    }

    @Test
    public void shouldNotAddNullInternalTopicProperties() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addInternalTopic("topic", (InternalTopicProperties) null);
        });
    }

    @Test
    public void shouldNotSetApplicationIdToNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.setApplicationId((String) null);
        });
    }

    @Test
    public void shouldNotSetStreamsConfigToNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.setStreamsConfig((StreamsConfig) null);
        });
    }

    @Test
    public void shouldNotAddNullStateStoreSupplier() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.builder.addStateStore((StoreBuilder) null, new String[0]);
        });
    }

    private Set<String> nodeNames(Collection<ProcessorNode<?, ?, ?, ?>> collection) {
        HashSet hashSet = new HashSet();
        Iterator<ProcessorNode<?, ?, ?, ?>> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().name());
        }
        return hashSet;
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addStateStore(this.storeBuilder, new String[]{"processor"});
        Map stateStoreNameToFullSourceTopicNames = this.builder.stateStoreNameToFullSourceTopicNames();
        Assert.assertEquals(1L, stateStoreNameToFullSourceTopicNames.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToFullSourceTopicNames.get("testStore"));
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addStateStore(this.storeBuilder, new String[]{"processor"});
        Map stateStoreNameToFullSourceTopicNames = this.builder.stateStoreNameToFullSourceTopicNames();
        Assert.assertEquals(1L, stateStoreNameToFullSourceTopicNames.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToFullSourceTopicNames.get("testStore"));
    }

    @Test
    public void shouldCorrectlyMapStateStoreToInternalTopics() {
        this.builder.setApplicationId("appId");
        this.builder.addInternalTopic("internal-topic", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"internal-topic"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addStateStore(this.storeBuilder, new String[]{"processor"});
        Map stateStoreNameToFullSourceTopicNames = this.builder.stateStoreNameToFullSourceTopicNames();
        Assert.assertEquals(1L, stateStoreNameToFullSourceTopicNames.size());
        Assert.assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToFullSourceTopicNames.get("testStore"));
    }

    @Test
    public void shouldAddInternalTopicConfigForWindowStores() {
        this.builder.setApplicationId("appId");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store1", Duration.ofSeconds(30L), Duration.ofSeconds(10L), false), Serdes.String(), Serdes.String()), new String[]{"processor"});
        this.builder.addStateStore(Stores.sessionStoreBuilder(Stores.persistentSessionStore("store2", Duration.ofSeconds(30L)), Serdes.String(), Serdes.String()), new String[]{"processor"});
        this.builder.buildTopology();
        InternalTopologyBuilder.TopicsInfo topicsInfo = (InternalTopologyBuilder.TopicsInfo) this.builder.subtopologyToTopicsInfo().values().iterator().next();
        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
        Map properties = internalTopicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals(3L, properties.size());
        Assert.assertEquals("compact,delete", properties.get("cleanup.policy"));
        Assert.assertEquals("40000", properties.get("retention.ms"));
        Assert.assertEquals("appId-store1-changelog", internalTopicConfig.name());
        Assert.assertTrue(internalTopicConfig instanceof WindowedChangelogTopicConfig);
        InternalTopicConfig internalTopicConfig2 = (InternalTopicConfig) topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
        Map properties2 = internalTopicConfig2.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals(3L, properties2.size());
        Assert.assertEquals("compact,delete", properties2.get("cleanup.policy"));
        Assert.assertEquals("40000", properties2.get("retention.ms"));
        Assert.assertEquals("appId-store2-changelog", internalTopicConfig2.name());
        Assert.assertTrue(internalTopicConfig2 instanceof WindowedChangelogTopicConfig);
    }

    @Test
    public void shouldAddInternalTopicConfigForNonWindowStores() {
        this.builder.setApplicationId("appId");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addStateStore(this.storeBuilder, new String[]{"processor"});
        this.builder.buildTopology();
        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) ((InternalTopologyBuilder.TopicsInfo) this.builder.subtopologyToTopicsInfo().values().iterator().next()).stateChangelogTopics.get("appId-testStore-changelog");
        Map properties = internalTopicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals(2L, properties.size());
        Assert.assertEquals("compact", properties.get("cleanup.policy"));
        Assert.assertEquals("appId-testStore-changelog", internalTopicConfig.name());
        Assert.assertTrue(internalTopicConfig instanceof UnwindowedChangelogTopicConfig);
    }

    @Test
    public void shouldAddInternalTopicConfigForRepartitionTopics() {
        this.builder.setApplicationId("appId");
        this.builder.addInternalTopic("foo", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"foo"});
        this.builder.buildTopology();
        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) ((InternalTopologyBuilder.TopicsInfo) this.builder.subtopologyToTopicsInfo().values().iterator().next()).repartitionSourceTopics.get("appId-foo");
        Map properties = internalTopicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals(4L, properties.size());
        Assert.assertEquals(String.valueOf(-1), properties.get("retention.ms"));
        Assert.assertEquals("delete", properties.get("cleanup.policy"));
        Assert.assertEquals("appId-foo", internalTopicConfig.name());
        Assert.assertTrue(internalTopicConfig instanceof RepartitionTopicConfig);
    }

    @Test
    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-foo"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("topic-[A-C]"));
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-3", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("topic-\\d"));
        HashSet hashSet = new HashSet();
        hashSet.add("topic-B");
        hashSet.add("topic-3");
        hashSet.add("topic-A");
        this.builder.addSubscribedTopicsFromMetadata(hashSet, (String) null);
        this.builder.setApplicationId("test-id");
        Map subtopologyToTopicsInfo = this.builder.subtopologyToTopicsInfo();
        Assert.assertTrue(((InternalTopologyBuilder.TopicsInfo) subtopologyToTopicsInfo.get(AssignmentTestUtils.SUBTOPOLOGY_0)).sourceTopics.contains("topic-foo"));
        Assert.assertTrue(((InternalTopologyBuilder.TopicsInfo) subtopologyToTopicsInfo.get(AssignmentTestUtils.SUBTOPOLOGY_1)).sourceTopics.contains("topic-A"));
        Assert.assertTrue(((InternalTopologyBuilder.TopicsInfo) subtopologyToTopicsInfo.get(AssignmentTestUtils.SUBTOPOLOGY_1)).sourceTopics.contains("topic-B"));
        Assert.assertTrue(((InternalTopologyBuilder.TopicsInfo) subtopologyToTopicsInfo.get(AssignmentTestUtils.SUBTOPOLOGY_2)).sourceTopics.contains("topic-3"));
    }

    @Test
    public void shouldSetTopologyConfigOnRewriteTopology() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("max.task.idle.ms", 100L);
        StreamsConfig streamsConfig2 = new StreamsConfig(streamsConfig);
        InternalTopologyBuilder rewriteTopology = this.builder.rewriteTopology(streamsConfig2);
        MatcherAssert.assertThat(rewriteTopology.topologyConfigs(), CoreMatchers.equalTo(new TopologyConfig((String) null, streamsConfig2, new Properties())));
        MatcherAssert.assertThat(Long.valueOf(rewriteTopology.topologyConfigs().getTaskConfig().maxTaskIdleMs), CoreMatchers.equalTo(100L));
    }

    @Test
    public void shouldUseNonDeprecatedConfigToSetCacheBytesWhenBothDeprecatedAndNonDeprecatedConfigsUsed() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("statestore.cache.max.bytes", 200L);
        streamsConfig.put("cache.max.bytes.buffering", 100L);
        StreamsConfig streamsConfig2 = new StreamsConfig(streamsConfig);
        InternalTopologyBuilder rewriteTopology = this.builder.rewriteTopology(streamsConfig2);
        MatcherAssert.assertThat(rewriteTopology.topologyConfigs(), CoreMatchers.equalTo(new TopologyConfig((String) null, streamsConfig2, new Properties())));
        MatcherAssert.assertThat(Long.valueOf(rewriteTopology.topologyConfigs().cacheSize), CoreMatchers.equalTo(200L));
    }

    @Test
    public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", 12345L);
        properties.put("max.task.idle.ms", 500L);
        properties.put("task.timeout.ms", 1000L);
        properties.put("buffered.records.per.partition", 15);
        properties.put("default.timestamp.extractor", MockTimestampExtractor.class);
        properties.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        properties.put("default.dsl.store", "in_memory");
        InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder(new TopologyConfig("my-topology", new StreamsConfig(StreamsTestUtils.getStreamsConfig()), properties));
        MatcherAssert.assertThat(Long.valueOf(internalTopologyBuilder.topologyConfigs().cacheSize), CoreMatchers.is(12345L));
        MatcherAssert.assertThat(Long.valueOf(internalTopologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs), CoreMatchers.equalTo(500L));
        MatcherAssert.assertThat(Long.valueOf(internalTopologyBuilder.topologyConfigs().getTaskConfig().taskTimeoutMs), CoreMatchers.equalTo(1000L));
        MatcherAssert.assertThat(Integer.valueOf(internalTopologyBuilder.topologyConfigs().getTaskConfig().maxBufferedSize), CoreMatchers.equalTo(15));
        MatcherAssert.assertThat(internalTopologyBuilder.topologyConfigs().getTaskConfig().timestampExtractor.getClass(), CoreMatchers.equalTo(MockTimestampExtractor.class));
        MatcherAssert.assertThat(internalTopologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), CoreMatchers.equalTo(LogAndContinueExceptionHandler.class));
        MatcherAssert.assertThat(internalTopologyBuilder.topologyConfigs().parseStoreType(), CoreMatchers.equalTo(Materialized.StoreType.IN_MEMORY));
    }

    @Test
    public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("statestore.cache.max.bytes", 12345L);
        streamsConfig.put("max.task.idle.ms", 500L);
        streamsConfig.put("task.timeout.ms", 1000L);
        streamsConfig.put("buffered.records.per.partition", 15);
        streamsConfig.put("default.timestamp.extractor", MockTimestampExtractor.class);
        streamsConfig.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder(new TopologyConfig("my-topology", new StreamsConfig(streamsConfig), new Properties()));
        MatcherAssert.assertThat(Long.valueOf(internalTopologyBuilder.topologyConfigs().cacheSize), CoreMatchers.is(12345L));
        MatcherAssert.assertThat(Long.valueOf(internalTopologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs), CoreMatchers.is(500L));
        MatcherAssert.assertThat(Long.valueOf(internalTopologyBuilder.topologyConfigs().getTaskConfig().taskTimeoutMs), CoreMatchers.is(1000L));
        MatcherAssert.assertThat(Integer.valueOf(internalTopologyBuilder.topologyConfigs().getTaskConfig().maxBufferedSize), CoreMatchers.is(15));
        MatcherAssert.assertThat(internalTopologyBuilder.topologyConfigs().getTaskConfig().timestampExtractor.getClass(), CoreMatchers.is(MockTimestampExtractor.class));
        MatcherAssert.assertThat(internalTopologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), CoreMatchers.is(LogAndContinueExceptionHandler.class));
    }

    @Test
    public void shouldAddTimestampExtractorPerSource() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", new MockTimestampExtractor(), (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        MatcherAssert.assertThat(this.builder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())).buildTopology().source("topic").getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithPatternPerSource() {
        Pattern compile = Pattern.compile("t.*");
        this.builder.addSource((Topology.AutoOffsetReset) null, "source", new MockTimestampExtractor(), (Deserializer) null, (Deserializer) null, compile);
        MatcherAssert.assertThat(this.builder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())).buildTopology().source(compile.pattern()).getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldSortProcessorNodesCorrectly() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.builder.addSource((Topology.AutoOffsetReset) null, "source2", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic2"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addProcessor("processor2", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addProcessor("processor3", new MockApiProcessorSupplier(), new String[]{"processor2"});
        this.builder.addSink("sink1", "topic2", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"processor1", "processor3"});
        Assert.assertEquals(1L, this.builder.describe().subtopologies().size());
        Iterator nodesInOrder = ((InternalTopologyBuilder.SubtopologyDescription) this.builder.describe().subtopologies().iterator().next()).nodesInOrder();
        Assert.assertTrue(nodesInOrder.hasNext());
        Assert.assertEquals("source1", ((InternalTopologyBuilder.AbstractNode) nodesInOrder.next()).name);
        Assert.assertEquals(6L, r0.size);
        Assert.assertTrue(nodesInOrder.hasNext());
        Assert.assertEquals("source2", ((InternalTopologyBuilder.AbstractNode) nodesInOrder.next()).name);
        Assert.assertEquals(4L, r0.size);
        Assert.assertTrue(nodesInOrder.hasNext());
        Assert.assertEquals("processor2", ((InternalTopologyBuilder.AbstractNode) nodesInOrder.next()).name);
        Assert.assertEquals(3L, r0.size);
        Assert.assertTrue(nodesInOrder.hasNext());
        Assert.assertEquals("processor1", ((InternalTopologyBuilder.AbstractNode) nodesInOrder.next()).name);
        Assert.assertEquals(2L, r0.size);
        Assert.assertTrue(nodesInOrder.hasNext());
        Assert.assertEquals("processor3", ((InternalTopologyBuilder.AbstractNode) nodesInOrder.next()).name);
        Assert.assertEquals(2L, r0.size);
        Assert.assertTrue(nodesInOrder.hasNext());
        Assert.assertEquals("sink1", ((InternalTopologyBuilder.AbstractNode) nodesInOrder.next()).name);
        Assert.assertEquals(1L, r0.size);
    }

    @Test
    public void shouldConnectRegexMatchedTopicsToStateStore() {
        this.builder.addSource((Topology.AutoOffsetReset) null, "ingest", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, Pattern.compile("topic-\\d+"));
        this.builder.addProcessor("my-processor", new MockApiProcessorSupplier(), new String[]{"ingest"});
        this.builder.addStateStore(this.storeBuilder, new String[]{"my-processor"});
        HashSet hashSet = new HashSet();
        hashSet.add("topic-2");
        hashSet.add("topic-3");
        hashSet.add("topic-A");
        this.builder.addSubscribedTopicsFromMetadata(hashSet, "test-thread");
        this.builder.setApplicationId("test-app");
        List list = (List) this.builder.stateStoreNameToFullSourceTopicNames().get(this.storeBuilder.name());
        Assert.assertEquals("Expected to contain two topics", 2L, list.size());
        Assert.assertTrue(list.contains("topic-2"));
        Assert.assertTrue(list.contains("topic-3"));
        Assert.assertFalse(list.contains("topic-A"));
    }

    @Test
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.addGlobalStore(this.storeBuilder, "sameName", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "anyTopicName", "sameName", new MockApiProcessorSupplier());
        });
    }

    @Test
    public void shouldThrowIfNameIsNull() {
        Assert.assertEquals("name cannot be null", ((Exception) Assert.assertThrows(NullPointerException.class, () -> {
            new InternalTopologyBuilder.Source((String) null, Collections.emptySet(), (Pattern) null);
        })).getMessage());
    }

    @Test
    public void shouldThrowIfTopicAndPatternAreNull() {
        Assert.assertEquals("Either topics or pattern must be not-null, but both are null.", ((Exception) Assert.assertThrows(IllegalArgumentException.class, () -> {
            new InternalTopologyBuilder.Source("name", (Set) null, (Pattern) null);
        })).getMessage());
    }

    @Test
    public void shouldThrowIfBothTopicAndPatternAreNotNull() {
        Assert.assertEquals("Either topics or pattern must be null, but both are not null.", ((Exception) Assert.assertThrows(IllegalArgumentException.class, () -> {
            new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile(""));
        })).getMessage());
    }

    @Test
    public void sourceShouldBeEqualIfNameAndTopicListAreTheSame() {
        MatcherAssert.assertThat(new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), (Pattern) null), CoreMatchers.equalTo(new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), (Pattern) null)));
    }

    @Test
    public void sourceShouldBeEqualIfNameAndPatternAreTheSame() {
        MatcherAssert.assertThat(new InternalTopologyBuilder.Source("name", (Set) null, Pattern.compile("topic")), CoreMatchers.equalTo(new InternalTopologyBuilder.Source("name", (Set) null, Pattern.compile("topic"))));
    }

    @Test
    public void sourceShouldNotBeEqualForDifferentNamesWithSameTopicList() {
        MatcherAssert.assertThat(new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), (Pattern) null), Matchers.not(CoreMatchers.equalTo(new InternalTopologyBuilder.Source("name2", Collections.singleton("topic"), (Pattern) null))));
    }

    @Test
    public void sourceShouldNotBeEqualForDifferentNamesWithSamePattern() {
        MatcherAssert.assertThat(new InternalTopologyBuilder.Source("name", (Set) null, Pattern.compile("topic")), Matchers.not(CoreMatchers.equalTo(new InternalTopologyBuilder.Source("name2", (Set) null, Pattern.compile("topic")))));
    }

    @Test
    public void sourceShouldNotBeEqualForDifferentTopicList() {
        InternalTopologyBuilder.Source source = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), (Pattern) null);
        InternalTopologyBuilder.Source source2 = new InternalTopologyBuilder.Source("name", Collections.emptySet(), (Pattern) null);
        InternalTopologyBuilder.Source source3 = new InternalTopologyBuilder.Source("name", Collections.singleton("topic2"), (Pattern) null);
        MatcherAssert.assertThat(source, Matchers.not(CoreMatchers.equalTo(source2)));
        MatcherAssert.assertThat(source, Matchers.not(CoreMatchers.equalTo(source3)));
    }

    @Test
    public void sourceShouldNotBeEqualForDifferentPattern() {
        InternalTopologyBuilder.Source source = new InternalTopologyBuilder.Source("name", (Set) null, Pattern.compile("topic"));
        InternalTopologyBuilder.Source source2 = new InternalTopologyBuilder.Source("name", (Set) null, Pattern.compile("topic2"));
        InternalTopologyBuilder.Source source3 = new InternalTopologyBuilder.Source("name", (Set) null, Pattern.compile("top*"));
        MatcherAssert.assertThat(source, Matchers.not(CoreMatchers.equalTo(source2)));
        MatcherAssert.assertThat(source, Matchers.not(CoreMatchers.equalTo(source3)));
    }

    @Test
    public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesArePresent() {
        this.builder.setApplicationId("Z");
        this.builder.addInternalTopic("topic-1z", new InternalTopicProperties(10));
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1z"});
        Assert.assertEquals(((InternalTopologyBuilder.TopicsInfo) this.builder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).repartitionSourceTopics.get("Z-topic-1z"), new RepartitionTopicConfig("Z-topic-1z", Collections.emptyMap(), 10, true));
    }

    @Test
    public void shouldHandleWhenTopicPropertiesNumberOfPartitionsIsNull() {
        this.builder.setApplicationId("T");
        this.builder.addInternalTopic("topic-1t", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1t"});
        Assert.assertEquals(((InternalTopologyBuilder.TopicsInfo) this.builder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).repartitionSourceTopics.get("T-topic-1t"), new RepartitionTopicConfig("T-topic-1t", Collections.emptyMap()));
    }

    @Test
    public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesAreNotPresent() {
        this.builder.setApplicationId("Y");
        this.builder.addInternalTopic("topic-1y", InternalTopicProperties.empty());
        this.builder.addSource((Topology.AutoOffsetReset) null, "source-1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic-1y"});
        Assert.assertEquals(((InternalTopologyBuilder.TopicsInfo) this.builder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).repartitionSourceTopics.get("Y-topic-1y"), new RepartitionTopicConfig("Y-topic-1y", Collections.emptyMap()));
    }

    @Test
    public void shouldConnectGlobalStateStoreToInputTopic() {
        this.builder.setApplicationId("X");
        this.builder.addGlobalStore(new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(), "globalSource", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, "global-topic", "global-processor", new MockApiProcessorSupplier());
        this.builder.initializeSubscription();
        this.builder.rewriteTopology(new StreamsConfig(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "asdf"), Utils.mkEntry("bootstrap.servers", "asdf")}))));
        MatcherAssert.assertThat(this.builder.buildGlobalStateTopology().storeToChangelogTopic().get("global-store"), CoreMatchers.is("global-topic"));
    }
}
