package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.class */
public class NamedTopologyIntegrationTest {
    private static final String INPUT_STREAM_1 = "input-stream-1";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private static final String INPUT_STREAM_3 = "input-stream-3";
    private static final String OUTPUT_STREAM_1 = "output-stream-1";
    private static final String OUTPUT_STREAM_2 = "output-stream-2";
    private static final String OUTPUT_STREAM_3 = "output-stream-3";
    private static final String SUM_OUTPUT = "sum";
    private static final String COUNT_OUTPUT = "count";
    private static final String DELAYED_INPUT_STREAM_1 = "delayed-input-stream-1";
    private static final String DELAYED_INPUT_STREAM_2 = "delayed-input-stream-2";
    private static Properties producerConfig;
    private static Properties consumerConfig;
    private String appId;
    private String changelog1;
    private String changelog2;
    private String changelog3;
    private Properties props;
    private Properties props2;
    private KafkaStreamsNamedTopologyWrapper streams;
    private KafkaStreamsNamedTopologyWrapper streams2;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = Materialized.as(Stores.inMemoryKeyValueStore("store"));
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> ROCKSDB_STORE = Materialized.as(Stores.persistentKeyValueStore("store"));
    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = Arrays.asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L), KeyValue.pair("C", -50L));
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = Arrays.asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 2L));
    private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA = Arrays.asList(KeyValue.pair("B", 200L), KeyValue.pair("A", 400L), KeyValue.pair("C", 350L));

    @Rule
    public final TestName testName = new TestName();
    private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
    private final NamedTopologyStreamsBuilder topology1Builder = new NamedTopologyStreamsBuilder("topology-1");
    private final NamedTopologyStreamsBuilder topology2Builder = new NamedTopologyStreamsBuilder("topology-2");
    private final NamedTopologyStreamsBuilder topology3Builder = new NamedTopologyStreamsBuilder("topology-3");
    private final NamedTopologyStreamsBuilder topology1Builder2 = new NamedTopologyStreamsBuilder("topology-1");
    private final NamedTopologyStreamsBuilder topology2Builder2 = new NamedTopologyStreamsBuilder("topology-2");
    private final NamedTopologyStreamsBuilder topology3Builder2 = new NamedTopologyStreamsBuilder("topology-3");

    @BeforeClass
    public static void initializeClusterAndStandardTopics() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_3, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class);
        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    private Properties configProps() {
        Properties properties = new Properties();
        properties.put("application.id", this.appId);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory(this.appId).getPath());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Long().getClass());
        properties.put("num.stream.threads", 2);
        properties.put("commit.interval.ms", 1000L);
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", 10000);
        return properties;
    }

    @Before
    public void setup() throws Exception {
        this.appId = IntegrationTestUtils.safeUniqueTestName(NamedTopologyIntegrationTest.class, this.testName);
        this.changelog1 = this.appId + "-topology-1-store-changelog";
        this.changelog2 = this.appId + "-topology-2-store-changelog";
        this.changelog3 = this.appId + "-topology-3-store-changelog";
        this.props = configProps();
        this.props2 = configProps();
        CLUSTER.createTopic(OUTPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_3, 2, 1);
    }

    @After
    public void shutdown() throws Exception {
        if (this.streams != null) {
            this.streams.close(Duration.ofSeconds(30L));
        }
        if (this.streams2 != null) {
            this.streams2.close(Duration.ofSeconds(30L));
        }
        CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
    }

    @Test
    public void shouldPrefixAllInternalTopicNamesWithNamedTopology() throws Exception {
        NamedTopologyStreamsBuilder namedTopologyStreamsBuilder = new NamedTopologyStreamsBuilder("count-topology");
        namedTopologyStreamsBuilder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count();
        NamedTopologyStreamsBuilder namedTopologyStreamsBuilder2 = new NamedTopologyStreamsBuilder("FKJ-topology");
        UniqueTopicSerdeScope uniqueTopicSerdeScope = new UniqueTopicSerdeScope();
        namedTopologyStreamsBuilder2.table(INPUT_STREAM_2, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), this.props, true), uniqueTopicSerdeScope.decorateSerde(Serdes.Long(), this.props, false))).join(namedTopologyStreamsBuilder2.table(INPUT_STREAM_3, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), this.props, true), uniqueTopicSerdeScope.decorateSerde(Serdes.Long(), this.props, false))), (v0) -> {
            return v0.toString();
        }, (l, l2) -> {
            return String.valueOf(l.longValue() + l2.longValue());
        }, Materialized.with((Serde) null, uniqueTopicSerdeScope.decorateSerde(Serdes.String(), this.props, false)));
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(namedTopologyStreamsBuilder2, namedTopologyStreamsBuilder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        String str = this.appId + "-count-topology";
        String str2 = this.appId + "-FKJ-topology";
        MatcherAssert.assertThat((Set) CLUSTER.getAllTopicsInCluster().stream().filter(str3 -> {
            return str3.contains(this.appId);
        }).filter(str4 -> {
            return str4.endsWith("-repartition") || str4.endsWith("-changelog") || str4.endsWith("-topic");
        }).collect(Collectors.toSet()), CoreMatchers.is(Utils.mkSet(new String[]{str + "-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition", str + "-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog", str2 + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic", str2 + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic", str2 + "-KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010-changelog", str2 + "-" + INPUT_STREAM_2 + "-STATE-STORE-0000000000-changelog", str2 + "-" + INPUT_STREAM_3 + "-STATE-STORE-0000000003-changelog"})));
    }

    @Test
    public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).selectKey((obj, obj2) -> {
            return obj;
        }).groupByKey().count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder.buildNamedTopology(this.props), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        Set<String> allTopicsInCluster = CLUSTER.getAllTopicsInCluster();
        MatcherAssert.assertThat(Boolean.valueOf(allTopicsInCluster.contains(this.appId + "-topology-1-store-changelog")), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(allTopicsInCluster.contains(this.appId + "-topology-1-store-repartition")), CoreMatchers.is(true));
    }

    @Test
    public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersistentStateStores() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(this.topology1Builder, this.topology2Builder, this.topology3Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(Boolean.valueOf(CLUSTER.getAllTopicsInCluster().containsAll(Arrays.asList(this.changelog1, this.changelog2, this.changelog3))), CoreMatchers.is(true));
    }

    @Test
    public void shouldAddNamedTopologyToUnstartedApplicationWithEmptyInitialTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        this.streams.addNamedTopology(this.topology1Builder.buildNamedTopology(this.props));
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        this.streams.start();
        this.streams.addNamedTopology(this.topology1Builder.buildNamedTopology(this.props));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder.buildNamedTopology(this.props), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        this.streams.addNamedTopology(this.topology2Builder.buildNamedTopology(this.props));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(this.topology1Builder, this.topology2Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        this.streams.addNamedTopology(this.topology3Builder.buildNamedTopology(this.props));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology2Builder2.stream(INPUT_STREAM_2).groupBy((obj7, obj8) -> {
            return obj7;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder.buildNamedTopology(this.props), this.props, this.clientSupplier);
        this.streams2 = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder2.buildNamedTopology(this.props2), this.props2, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(this.streams, this.streams2), Duration.ofSeconds(15L));
        this.streams.addNamedTopology(this.topology2Builder.buildNamedTopology(this.props));
        this.streams2.addNamedTopology(this.topology2Builder2.buildNamedTopology(this.props2));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    @Ignore
    public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws Exception {
        this.topology1Builder.stream(DELAYED_INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(DELAYED_INPUT_STREAM_2).map((obj3, obj4) -> {
            throw new IllegalStateException("Should not process any records for removed topology-2");
        });
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(this.topology1Builder, this.topology2Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        this.streams.removeNamedTopology("topology-2");
        produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    @Ignore
    public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        KStream stream = this.topology1Builder.stream(INPUT_STREAM_1);
        stream.groupByKey().count().toStream().to(COUNT_OUTPUT);
        stream.groupByKey().reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).toStream().to(SUM_OUTPUT);
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(this.topology1Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        this.streams.removeNamedTopology("topology-1");
        this.streams.cleanUpNamedTopology("topology-1");
        KStream stream2 = this.topology1Builder2.stream(DELAYED_INPUT_STREAM_1);
        stream2.groupByKey().reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).toStream().to(SUM_OUTPUT);
        stream2.groupByKey().count().toStream().to(COUNT_OUTPUT);
        produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.streams.addNamedTopology(this.topology1Builder2.buildNamedTopology(this.props));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
    }

    @Test
    public void shouldAllowPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(Pattern.compile(INPUT_STREAM_1)).groupBy((obj, obj2) -> {
            return obj;
        }).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((obj3, obj4) -> {
            return obj3;
        }).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((obj5, obj6) -> {
            return obj5;
        }).count().toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(this.topology1Builder, this.topology2Builder, this.topology3Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((obj3, obj4) -> {
            return obj3;
        }).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((obj5, obj6) -> {
            return obj5;
        }).count().toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(this.topology1Builder, this.topology2Builder, this.topology3Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    private static void produceToInputTopics(String str, Collection<KeyValue<String, Long>> collection) {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, collection, producerConfig, CLUSTER.time);
    }

    private List<NamedTopology> buildNamedTopologies(NamedTopologyStreamsBuilder... namedTopologyStreamsBuilderArr) {
        ArrayList arrayList = new ArrayList();
        for (NamedTopologyStreamsBuilder namedTopologyStreamsBuilder : namedTopologyStreamsBuilderArr) {
            arrayList.add(namedTopologyStreamsBuilder.buildNamedTopology(this.props));
        }
        return arrayList;
    }
}
