package org.apache.kafka.streams;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/kafka/streams/StreamsBuilderTest.class */
public class StreamsBuilderTest {
    private static final String STREAM_TOPIC = "stream-topic";
    private static final String STREAM_OPERATION_NAME = "stream-operation";
    private static final String STREAM_TOPIC_TWO = "stream-topic-two";
    private static final String TABLE_TOPIC = "table-topic";

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final StreamsBuilder builder = new StreamsBuilder();
    private Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    @Before
    public void before() {
        this.props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    }

    @Test
    public void shouldAddGlobalStore() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String()), AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String()), () -> {
            return new Processor<String, String, Void, Void>() { // from class: org.apache.kafka.streams.StreamsBuilderTest.1
                private KeyValueStore store;

                public void init(ProcessorContext<Void, Void> processorContext) {
                    this.store = processorContext.getStateStore("store");
                }

                public void process(Record<String, String> record) {
                    this.store.put(record.key(), record.value());
                }
            };
        });
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new StringSerializer(), new StringSerializer()).pipeInput("hey", "there");
                MatcherAssert.assertThat((String) topologyTestDriver.getKeyValueStore("store").get("hey"), Is.is("there"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotThrowNullPointerIfOptimizationsNotSpecified() {
        new StreamsBuilder().build(new Properties());
    }

    @Test
    public void shouldAllowJoinUnmaterializedFilteredKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table(TABLE_TOPIC).filter(MockPredicate.allGoodPredicate()), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton(((StateStore) buildTopology.stateStores().get(0)).name())));
        Assert.assertTrue(buildTopology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedFilteredKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table(TABLE_TOPIC).filter(MockPredicate.allGoodPredicate(), Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table(TABLE_TOPIC).mapValues(MockMapper.noOpValueMapper()), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton(((StateStore) buildTopology.stateStores().get(0)).name())));
        Assert.assertTrue(buildTopology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedMapValuedKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table(TABLE_TOPIC).mapValues(MockMapper.noOpValueMapper(), Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedJoinedKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table("table-topic1").join(this.builder.table("table-topic2"), MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), CoreMatchers.equalTo(Utils.mkSet(new String[]{((StateStore) buildTopology.stateStores().get(0)).name(), ((StateStore) buildTopology.stateStores().get(1)).name()})));
        Assert.assertTrue(buildTopology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedJoinedKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table("table-topic1").join(this.builder.table("table-topic2"), MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(3));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinMaterializedSourceKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table(TABLE_TOPIC), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), CoreMatchers.equalTo(Collections.singleton(((StateStore) buildTopology.stateStores().get(0)).name())));
        MatcherAssert.assertThat(buildTopology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), CoreMatchers.equalTo(Collections.singleton(((StateStore) buildTopology.stateStores().get(0)).name())));
    }

    @Test
    public void shouldProcessingFromSinkTopic() {
        KStream stream = this.builder.stream("topic-source");
        stream.to("topic-sink");
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        stream.process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("A", "aa");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp("A", "aa", 0L)), mockApiProcessorSupplier.theCapturedProcessor().processed());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @Deprecated
    public void shouldProcessViaThroughTopic() {
        KStream stream = this.builder.stream("topic-source");
        KStream through = stream.through("topic-sink");
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        stream.process(mockApiProcessorSupplier, new String[0]);
        MockApiProcessorSupplier mockApiProcessorSupplier2 = new MockApiProcessorSupplier();
        through.process(mockApiProcessorSupplier2, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("A", "aa");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp("A", "aa", 0L)), mockApiProcessorSupplier.theCapturedProcessor().processed());
                Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp("A", "aa", 0L)), mockApiProcessorSupplier2.theCapturedProcessor().processed());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldProcessViaRepartitionTopic() {
        KStream stream = this.builder.stream("topic-source");
        KStream repartition = stream.repartition();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        stream.process(mockApiProcessorSupplier, new String[0]);
        MockApiProcessorSupplier mockApiProcessorSupplier2 = new MockApiProcessorSupplier();
        repartition.process(mockApiProcessorSupplier2, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("A", "aa");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp("A", "aa", 0L)), mockApiProcessorSupplier.theCapturedProcessor().processed());
                Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp("A", "aa", 0L)), mockApiProcessorSupplier2.theCapturedProcessor().processed());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMergeStreams() {
        KStream merge = this.builder.stream("topic-1").merge(this.builder.stream("topic-2"));
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        merge.process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic-1", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic-2", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", "aa");
            createInputTopic2.pipeInput("B", "bb");
            createInputTopic2.pipeInput("C", "cc");
            createInputTopic.pipeInput("D", "dd");
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", "aa", 0L), new KeyValueTimestamp("B", "bb", 0L), new KeyValueTimestamp("C", "cc", 0L), new KeyValueTimestamp("D", "dd", 0L)), mockApiProcessorSupplier.theCapturedProcessor().processed());
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedInMaterialized() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().count(Materialized.as("store").withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY)).toStream();
        this.builder.build();
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedInMaterializedOverTopologyOverrides() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().count(Materialized.as("store").withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY)).toStream();
        streamsBuilder.build();
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierOverStoreType() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("default.dsl.store", "rocksDB");
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().count(Materialized.as("store")).toStream();
        streamsBuilder.build();
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseTopologyOverrideStoreTypeOverConfiguredDslStoreSupplier() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("default.dsl.store", "in_memory");
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().count(Materialized.as("store")).toStream();
        streamsBuilder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedConfiguredInStreamsConfig() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().count().toStream();
        this.builder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedConfiguredInTopologyConfigOverStreamsConfig() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().count().toStream();
        streamsBuilder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedInMaterializedForWindowedOperation() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().windowedBy(JoinWindows.ofTimeDifferenceAndGrace(Duration.ofHours(1L), Duration.ZERO)).count(Materialized.as("store").withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY)).toStream();
        this.builder.build();
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedConfiguredInStreamsConfigForWindowedOperation() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().windowedBy(JoinWindows.ofTimeDifferenceAndGrace(Duration.ofHours(1L), Duration.ZERO)).count().toStream();
        this.builder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedConfiguredInTopologyConfigOverStreamsConfigForWindowedOperation() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().windowedBy(JoinWindows.ofTimeDifferenceAndGrace(Duration.ofHours(1L), Duration.ZERO)).count().toStream();
        streamsBuilder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedInMaterializedForSessionWindowedOperation() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofHours(1L), Duration.ZERO)).count(Materialized.as("store").withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY)).toStream();
        this.builder.build();
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemorySessionStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedConfiguredInStreamsConfigForSessionWindowedOperation() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofHours(1L), Duration.ZERO)).count().toStream();
        this.builder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemorySessionStore.class);
    }

    @Test
    public void shouldUseDslStoreSupplierDefinedConfiguredInTopologyConfigOverStreamsConfigForSessionWindowedOperation() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).groupByKey().windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofHours(1L), Duration.ZERO)).count().toStream();
        streamsBuilder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemorySessionStore.class);
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
        HashMap hashMap = new HashMap();
        hashMap.getClass();
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String())).toStream().foreach((v1, v2) -> {
            r0.put(v1, v2);
        });
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new LongSerializer(), new StringSerializer());
                createInputTopic.pipeInput(1L, "value1");
                createInputTopic.pipeInput(2L, "value2");
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                MatcherAssert.assertThat(keyValueStore.get(1L), CoreMatchers.equalTo("value1"));
                MatcherAssert.assertThat(keyValueStore.get(2L), CoreMatchers.equalTo("value2"));
                MatcherAssert.assertThat(hashMap.get(1L), CoreMatchers.equalTo("value1"));
                MatcherAssert.assertThat(hashMap.get(2L), CoreMatchers.equalTo("value2"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
        this.builder.globalTable(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new LongSerializer(), new StringSerializer());
                createInputTopic.pipeInput(1L, "value1");
                createInputTopic.pipeInput(2L, "value2");
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                MatcherAssert.assertThat(keyValueStore.get(1L), CoreMatchers.equalTo("value1"));
                MatcherAssert.assertThat(keyValueStore.get(2L), CoreMatchers.equalTo("value2"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowOnVersionedStoreSupplierForGlobalTable() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.builder.globalTable(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ZERO)).withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        });
    }

    @Test
    public void shouldNotMaterializeStoresIfNotRequired() {
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.with(Serdes.Long(), Serdes.String()));
        MatcherAssert.assertThat(Integer.valueOf(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores().size()), CoreMatchers.equalTo(0));
    }

    @Test
    public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("topology.optimization", "all");
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.builder.build(streamsConfig));
        internalTopologyBuilder.rewriteTopology(new StreamsConfig(streamsConfig));
        MatcherAssert.assertThat(internalTopologyBuilder.buildTopology().storeToChangelogTopic(), CoreMatchers.equalTo(Collections.singletonMap("store", AssignmentTestUtils.TOPIC_PREFIX)));
        MatcherAssert.assertThat(internalTopologyBuilder.stateStores().keySet(), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(Boolean.valueOf(((StoreFactory) internalTopologyBuilder.stateStores().get("store")).loggingEnabled()), CoreMatchers.equalTo(false));
        MatcherAssert.assertThat(Boolean.valueOf(((InternalTopologyBuilder.TopicsInfo) internalTopologyBuilder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).nonSourceChangelogTopics().isEmpty()), CoreMatchers.equalTo(true));
    }

    @Test
    public void shouldNotReuseRepartitionTopicAsChangelogs() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX).repartition().toTable(Materialized.as("store"));
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig("appId");
        streamsConfig.put("topology.optimization", "all");
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.builder.build(streamsConfig));
        internalTopologyBuilder.rewriteTopology(new StreamsConfig(streamsConfig));
        MatcherAssert.assertThat(internalTopologyBuilder.buildTopology().storeToChangelogTopic(), CoreMatchers.equalTo(Collections.singletonMap("store", "appId-store-changelog")));
        MatcherAssert.assertThat(internalTopologyBuilder.stateStores().keySet(), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(Boolean.valueOf(((StoreFactory) internalTopologyBuilder.stateStores().get("store")).loggingEnabled()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(((InternalTopologyBuilder.TopicsInfo) internalTopologyBuilder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_1)).stateChangelogTopics.keySet(), CoreMatchers.equalTo(Collections.singleton("appId-store-changelog")));
    }

    @Test
    public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.builder.build());
        internalTopologyBuilder.setApplicationId("appId");
        MatcherAssert.assertThat(internalTopologyBuilder.buildTopology().storeToChangelogTopic(), CoreMatchers.equalTo(Collections.singletonMap("store", "appId-store-changelog")));
        MatcherAssert.assertThat(internalTopologyBuilder.stateStores().keySet(), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(Boolean.valueOf(((StoreFactory) internalTopologyBuilder.stateStores().get("store")).loggingEnabled()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(((InternalTopologyBuilder.TopicsInfo) internalTopologyBuilder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).stateChangelogTopics.keySet(), CoreMatchers.equalTo(Collections.singleton("appId-store-changelog")));
    }

    @Test
    public void shouldThrowExceptionWhenNoTopicPresent() {
        this.builder.stream(Collections.emptyList());
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldThrowExceptionWhenTopicNamesAreNull() {
        this.builder.stream(Arrays.asList(null, null));
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(NullPointerException.class, streamsBuilder::build);
    }

    @Test
    public void shouldUseSpecifiedNameForStreamSourceProcessor() {
        this.builder.stream(STREAM_TOPIC, Consumed.as("source-node"));
        this.builder.stream(STREAM_TOPIC_TWO);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "source-node", "KSTREAM-SOURCE-0000000001");
    }

    @Test
    public void shouldUseSpecifiedNameForTableSourceProcessor() {
        this.builder.table(STREAM_TOPIC, Consumed.as("source-node"));
        this.builder.table(STREAM_TOPIC_TWO);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "source-node-source", "source-node", "KSTREAM-SOURCE-0000000004", "KTABLE-SOURCE-0000000005");
    }

    @Test
    public void shouldUseSpecifiedNameForGlobalTableSourceProcessor() {
        this.builder.globalTable(STREAM_TOPIC, Consumed.as("source-processor"));
        this.builder.globalTable(STREAM_TOPIC_TWO);
        this.builder.build();
        assertNamesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().globalStateStores(), "stream-topic-STATE-STORE-0000000000", "stream-topic-two-STATE-STORE-0000000003");
    }

    @Test
    public void shouldUseSpecifiedNameForSinkProcessor() {
        KStream stream = this.builder.stream(STREAM_TOPIC);
        stream.to(STREAM_TOPIC_TWO, Produced.as("sink-processor"));
        stream.to(STREAM_TOPIC_TWO);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "sink-processor", "KSTREAM-SINK-0000000002");
    }

    @Test
    public void shouldUseSpecifiedNameForMapOperation() {
        this.builder.stream(STREAM_TOPIC).map(KeyValue::pair, Named.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForMapValuesOperation() {
        this.builder.stream(STREAM_TOPIC).mapValues(obj -> {
            return obj;
        }, Named.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForMapValuesWithKeyOperation() {
        this.builder.stream(STREAM_TOPIC).mapValues((obj, obj2) -> {
            return obj2;
        }, Named.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForFilterOperation() {
        this.builder.stream(STREAM_TOPIC).filter((obj, obj2) -> {
            return true;
        }, Named.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForForEachOperation() {
        this.builder.stream(STREAM_TOPIC).foreach((obj, obj2) -> {
        }, Named.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForTransform() {
        this.builder.stream(STREAM_TOPIC).transform(() -> {
            return null;
        }, Named.as(STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForTransformValues() {
        this.builder.stream(STREAM_TOPIC).transformValues(() -> {
            return new NoopValueTransformer();
        }, Named.as(STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForTransformValuesWithKey() {
        this.builder.stream(STREAM_TOPIC).transformValues(() -> {
            return new NoopValueTransformerWithKey();
        }, Named.as(STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForBranchOperation() {
        this.builder.stream(STREAM_TOPIC).branch(Named.as("branch-processor"), new Predicate[]{(obj, obj2) -> {
            return true;
        }, (obj3, obj4) -> {
            return false;
        }});
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "branch-processor", "branch-processor-predicate-0", "branch-processor-predicate-1");
    }

    @Test
    public void shouldUseSpecifiedNameForSplitOperation() {
        this.builder.stream(STREAM_TOPIC).split(Named.as("branch-processor")).branch((obj, obj2) -> {
            return true;
        }, Branched.as("-1")).branch((obj3, obj4) -> {
            return false;
        }, Branched.as("-2"));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "branch-processor", "branch-processor-1", "branch-processor-2");
    }

    @Test
    public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKTable() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.table(TABLE_TOPIC), (str, str2) -> {
            return str;
        }, Joined.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000002", "KTABLE-SOURCE-0000000003", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKTable() {
        this.builder.stream(STREAM_TOPIC).leftJoin(this.builder.table(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, Joined.as(STREAM_OPERATION_NAME));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000002", "KTABLE-SOURCE-0000000003", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabledViaOldApi() {
        this.builder.stream(STREAM_TOPIC).leftJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.of(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build(new Properties());
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "stream-operation-this-join-store", "stream-operation-outer-other-join-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).leftJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "stream-operation-this-join-store", "stream-operation-outer-other-join-store", "stream-operation-left-shared-join-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseGeneratedStoreNamesForLeftJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).leftJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "KSTREAM-JOINTHIS-0000000004-store", "KSTREAM-OUTEROTHER-0000000005-store", "KSTREAM-OUTERSHARED-0000000004-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.of(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "stream-operation-this-join-store", "stream-operation-other-join-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseGeneratedNameForJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).join(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.of(Duration.ofHours(1L)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "KSTREAM-JOINTHIS-0000000004-store", "KSTREAM-JOINOTHER-0000000005-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedNameForOuterJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).outerJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "stream-operation-outer-this-join-store", "stream-operation-outer-other-join-store", "stream-operation-outer-shared-join-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-outer-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseGeneratedStoreNamesForOuterJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).outerJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "KSTREAM-OUTERTHIS-0000000004-store", "KSTREAM-OUTEROTHER-0000000005-store", "KSTREAM-OUTERSHARED-0000000004-store");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-outer-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedDslStoreSuppliersForAllOuterJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).outerJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME).withDslStoreSuppliers(BuiltInDslStoreSuppliers.IN_MEMORY));
        this.builder.build();
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class, InMemoryWindowStore.class, InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseConfiguredInStreamsConfigIfNoTopologyOverrideDslStoreSuppliersForAllOuterJoinOperationBetweenKStreamAndKStream() {
        this.builder.stream(STREAM_TOPIC).outerJoin(this.builder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class, InMemoryWindowStore.class, InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseConfiguredTopologyOverrideDslStoreSuppliersForAllOuterJoinOperationBetweenKStreamAndKStream() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        streamsBuilder.stream(STREAM_TOPIC).outerJoin(streamsBuilder.stream(STREAM_TOPIC_TWO), (str, str2) -> {
            return str;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)), StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        streamsBuilder.build();
        this.props.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class, InMemoryWindowStore.class, InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseSpecifiedStoreSupplierForEachOuterJoinOperationBetweenKStreamAndKStreamAndUseSameTypeAsThisSupplierForOuter() {
        KStream stream = this.builder.stream(STREAM_TOPIC);
        KStream stream2 = this.builder.stream(STREAM_TOPIC_TWO);
        JoinWindows ofTimeDifferenceWithNoGrace = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L));
        stream.outerJoin(stream2, (str, str2) -> {
            return str;
        }, ofTimeDifferenceWithNoGrace, StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME).withThisStoreSupplier(Stores.inMemoryWindowStore("thisSupplier", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true)).withOtherStoreSupplier(Stores.persistentWindowStore("otherSupplier", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true)));
        this.builder.build();
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class, RocksDBWindowStore.class, InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseSpecifiedStoreSuppliersOuterJoinStoreEvenIfThisSupplierIsSupplied() {
        KStream stream = this.builder.stream(STREAM_TOPIC);
        KStream stream2 = this.builder.stream(STREAM_TOPIC_TWO);
        JoinWindows ofTimeDifferenceWithNoGrace = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L));
        stream.outerJoin(stream2, (str, str2) -> {
            return str;
        }, ofTimeDifferenceWithNoGrace, StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME).withDslStoreSuppliers(BuiltInDslStoreSuppliers.ROCKS_DB).withThisStoreSupplier(Stores.inMemoryWindowStore("thisSupplier", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true)).withOtherStoreSupplier(Stores.persistentWindowStore("otherSupplier", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true)));
        this.builder.build();
        assertTypesForStateStore(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class, RocksDBWindowStore.class, RocksDBStore.class);
    }

    @Test
    public void shouldUseThisStoreSupplierEvenIfDslStoreSuppliersConfiguredInTopologyConfig() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("dsl.store.suppliers.class", BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));
        KStream stream = streamsBuilder.stream(STREAM_TOPIC);
        KStream stream2 = streamsBuilder.stream(STREAM_TOPIC_TWO);
        JoinWindows ofTimeDifferenceWithNoGrace = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L));
        stream.outerJoin(stream2, (str, str2) -> {
            return str;
        }, ofTimeDifferenceWithNoGrace, StreamJoined.as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME).withThisStoreSupplier(Stores.inMemoryWindowStore("thisSupplier", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true)).withOtherStoreSupplier(Stores.persistentWindowStore("otherSupplier", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true)));
        streamsBuilder.build();
        assertTypesForStateStore(streamsBuilder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology().stateStores(), InMemoryWindowStore.class, RocksDBWindowStore.class, InMemoryKeyValueStore.class);
    }

    @Test
    public void shouldUseSpecifiedNameForMergeOperation() {
        this.builder.stream("topic-1").merge(this.builder.stream("topic-2"), Named.as("merge-processor"));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "merge-processor");
    }

    @Test
    public void shouldUseSpecifiedNameForProcessOperation() {
        this.builder.stream(STREAM_TOPIC).process(new MockApiProcessorSupplier(), Named.as("test-processor"), new String[0]);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "test-processor");
    }

    @Test
    public void shouldUseSpecifiedNameForPrintOperation() {
        this.builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", "print-processor");
    }

    @Test
    public void shouldUseSpecifiedNameForFlatTransformValueOperation() {
        this.builder.stream(STREAM_TOPIC).flatTransformValues(() -> {
            return new NoopValueTransformer();
        }, Named.as(STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() {
        this.builder.stream(STREAM_TOPIC).flatTransformValues(() -> {
            return new NoopValueTransformerWithKey();
        }, Named.as(STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForToStream() {
        this.builder.table(STREAM_TOPIC).toStream(Named.as("to-stream"));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000001", "KTABLE-SOURCE-0000000002", "to-stream");
    }

    @Test
    public void shouldUseSpecifiedNameForToStreamWithMapper() {
        this.builder.table(STREAM_TOPIC).toStream(KeyValue::pair, Named.as("to-stream"));
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology(), "KSTREAM-SOURCE-0000000001", "KTABLE-SOURCE-0000000002", "to-stream", "KSTREAM-KEY-SELECT-0000000004");
    }

    @Test
    public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
        this.builder.table(STREAM_TOPIC).groupBy(KeyValue::pair, Grouped.as("group-operation")).count(Named.as(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildTopology();
        assertNamesForStateStore(buildTopology.stateStores(), "stream-topic-STATE-STORE-0000000000", "KTABLE-AGGREGATE-STATE-STORE-0000000004");
        assertNamesForOperation(buildTopology, "KSTREAM-SOURCE-0000000001", "KTABLE-SOURCE-0000000002", "group-operation", "stream-operation-sink", "stream-operation-source", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForGlobalStoreProcessor() {
        this.builder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String()), AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String()).withName("test"), new MockApiProcessorSupplier());
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildGlobalStateTopology(), "test-source", "test");
    }

    @Test
    public void shouldUseDefaultNameForGlobalStoreProcessor() {
        this.builder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String()), AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String()), new MockApiProcessorSupplier());
        this.builder.build();
        assertNamesForOperation(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(this.props)).buildGlobalStateTopology(), "KSTREAM-SOURCE-0000000000", "KTABLE-SOURCE-0000000001");
    }

    @Test
    public void shouldAllowStreamsFromSameTopic() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX);
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX);
        assertBuildDoesNotThrow(this.builder);
    }

    @Test
    public void shouldAllowSubscribingToSamePattern() {
        this.builder.stream(Pattern.compile("some-regex"));
        this.builder.stream(Pattern.compile("some-regex"));
        assertBuildDoesNotThrow(this.builder);
    }

    @Test
    public void shouldAllowReadingFromSameCollectionOfTopics() {
        this.builder.stream(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME));
        this.builder.stream(Arrays.asList(AssignmentTestUtils.TP_2_NAME, AssignmentTestUtils.TP_1_NAME));
        assertBuildDoesNotThrow(this.builder);
    }

    @Test
    public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
        this.builder.stream(Collections.singletonList(AssignmentTestUtils.TOPIC_PREFIX));
        this.builder.stream(Arrays.asList(AssignmentTestUtils.TOPIC_PREFIX, "anotherTopic"));
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Topology.AutoOffsetReset.LATEST));
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX);
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldThrowWhenSubscribedToATopicWithUnsetAndSetResetPolicies() {
        this.builder.stream("another-topic");
        this.builder.stream("another-topic", Consumed.with(Topology.AutoOffsetReset.LATEST));
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldThrowWhenSubscribedToAPatternWithDifferentResetPolicies() {
        this.builder.stream(Pattern.compile("some-regex"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        this.builder.stream(Pattern.compile("some-regex"), Consumed.with(Topology.AutoOffsetReset.LATEST));
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldThrowWhenSubscribedToAPatternWithSetAndUnsetResetPolicies() {
        this.builder.stream(Pattern.compile("some-regex"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        this.builder.stream(Pattern.compile("some-regex"));
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldNotAllowTablesFromSameTopic() {
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX);
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX);
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    @Test
    public void shouldNowAllowStreamAndTableFromSameTopic() {
        this.builder.stream(AssignmentTestUtils.TOPIC_PREFIX);
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX);
        StreamsBuilder streamsBuilder = this.builder;
        streamsBuilder.getClass();
        Assert.assertThrows(TopologyException.class, streamsBuilder::build);
    }

    private static void assertBuildDoesNotThrow(StreamsBuilder streamsBuilder) {
        try {
            streamsBuilder.build();
        } catch (TopologyException e) {
            Assert.fail("TopologyException not expected");
        }
    }

    private static void assertNamesForOperation(ProcessorTopology processorTopology, String... strArr) {
        List processors = processorTopology.processors();
        Assert.assertEquals("Invalid number of expected processors", strArr.length, processors.size());
        for (int i = 0; i < strArr.length; i++) {
            Assert.assertEquals(strArr[i], ((ProcessorNode) processors.get(i)).name());
        }
    }

    private static void assertNamesForStateStore(List<StateStore> list, String... strArr) {
        Assert.assertEquals("Invalid number of expected state stores", strArr.length, list.size());
        for (int i = 0; i < strArr.length; i++) {
            Assert.assertEquals(strArr[i], list.get(i).name());
        }
    }

    private static void assertTypesForStateStore(List<StateStore> list, Class<?>... clsArr) {
        StateStore stateStore;
        Assert.assertEquals("Invalid number of expected state stores", clsArr.length, list.size());
        for (int i = 0; i < clsArr.length; i++) {
            StateStore stateStore2 = list.get(i);
            while (true) {
                stateStore = stateStore2;
                if ((stateStore instanceof WrappedStateStore) && !clsArr[i].isInstance(stateStore)) {
                    stateStore2 = ((WrappedStateStore) stateStore).wrapped();
                }
            }
            MatcherAssert.assertThat(stateStore, CoreMatchers.instanceOf(clsArr[i]));
        }
    }
}
