package org.apache.kafka.streams.kstream;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/KStreamBuilderTest.class */
public class KStreamBuilderTest {
    private static final String APP_ID = "app-id";
    private final KStreamBuilder builder = new KStreamBuilder();
    private KStreamTestDriver driver = null;

    @Before
    public void setUp() {
        this.builder.setApplicationId(APP_ID);
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test(expected = TopologyBuilderException.class)
    public void testFrom() {
        this.builder.stream(new String[]{"topic-1", "topic-2"});
        this.builder.addSource("KSTREAM-SOURCE-0000000000", new String[]{"topic-3"});
    }

    @Test
    public void testNewName() {
        Assert.assertEquals("X-0000000000", this.builder.newName("X-"));
        Assert.assertEquals("Y-0000000001", this.builder.newName("Y-"));
        Assert.assertEquals("Z-0000000002", this.builder.newName("Z-"));
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        Assert.assertEquals("X-0000000000", kStreamBuilder.newName("X-"));
        Assert.assertEquals("Y-0000000001", kStreamBuilder.newName("Y-"));
        Assert.assertEquals("Z-0000000002", kStreamBuilder.newName("Z-"));
    }

    @Test
    public void shouldNotTryProcessingFromSinkTopic() {
        KStream stream = this.builder.stream(new String[]{"topic-source"});
        stream.to("topic-sink");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        stream.process(mockProcessorSupplier, new String[0]);
        this.driver = new KStreamTestDriver(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa"}), mockProcessorSupplier.processed);
    }

    @Test
    public void shouldTryProcessingFromThoughTopic() {
        KStream stream = this.builder.stream(new String[]{"topic-source"});
        KStream through = stream.through("topic-sink");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        stream.process(mockProcessorSupplier, new String[0]);
        through.process(mockProcessorSupplier2, new String[0]);
        this.driver = new KStreamTestDriver(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa"}), mockProcessorSupplier.processed);
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa"}), mockProcessorSupplier2.processed);
    }

    @Test
    public void testNewStoreName() {
        Assert.assertEquals("X-STATE-STORE-0000000000", this.builder.newStoreName("X-"));
        Assert.assertEquals("Y-STATE-STORE-0000000001", this.builder.newStoreName("Y-"));
        Assert.assertEquals("Z-STATE-STORE-0000000002", this.builder.newStoreName("Z-"));
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        Assert.assertEquals("X-STATE-STORE-0000000000", kStreamBuilder.newStoreName("X-"));
        Assert.assertEquals("Y-STATE-STORE-0000000001", kStreamBuilder.newStoreName("Y-"));
        Assert.assertEquals("Z-STATE-STORE-0000000002", kStreamBuilder.newStoreName("Z-"));
    }

    @Test
    public void testMerge() {
        KStream merge = this.builder.merge(new KStream[]{this.builder.stream(new String[]{"topic-1"}), this.builder.stream(new String[]{"topic-2"})});
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        merge.process(mockProcessorSupplier, new String[0]);
        this.driver = new KStreamTestDriver(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-1", "A", "aa");
        this.driver.process("topic-2", "B", "bb");
        this.driver.process("topic-2", "C", "cc");
        this.driver.process("topic-1", "D", "dd");
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), mockProcessorSupplier.processed);
    }

    @Test
    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
        KStream stream = this.builder.stream(new String[]{"topic-1"});
        KStream stream2 = this.builder.stream(new String[]{"topic-2"});
        KStream stream3 = this.builder.stream(new String[]{"topic-3"});
        this.builder.merge(new KStream[]{stream.mapValues(new ValueMapper<String, String>() { // from class: org.apache.kafka.streams.kstream.KStreamBuilderTest.2
            public String apply(String str) {
                return str;
            }
        }).filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.KStreamBuilderTest.1
            public boolean test(String str, String str2) {
                return true;
            }
        }), stream2.filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.KStreamBuilderTest.3
            public boolean test(String str, String str2) {
                return true;
            }
        }), stream3}).groupByKey().count("my-table");
        Assert.assertEquals(Utils.mkList(new String[]{"topic-1", "topic-2", "topic-3"}), this.builder.stateStoreNameToSourceTopics().get("my-table"));
    }

    @Test(expected = TopologyBuilderException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
        this.builder.stream(new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
        this.builder.stream(Serdes.String(), Serdes.String(), new String[]{null, null});
    }

    @Test
    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
        KTable table = this.builder.table("topic1", "table1");
        KTable table2 = this.builder.table("topic2", (String) null);
        ProcessorTopology build = this.builder.build((Integer) null);
        Assert.assertEquals(2L, build.stateStores().size());
        Assert.assertEquals("table1", ((StateStore) build.stateStores().get(0)).name());
        String name = ((StateStore) build.stateStores().get(1)).name();
        Assert.assertTrue(name.contains("STATE-STORE-"));
        Assert.assertEquals(2L, build.storeToChangelogTopic().size());
        Assert.assertEquals("topic1", build.storeToChangelogTopic().get("table1"));
        Assert.assertEquals("topic2", build.storeToChangelogTopic().get(name));
        Assert.assertEquals(table.queryableStoreName(), "table1");
        Assert.assertNull(table2.queryableStoreName());
    }

    @Test
    public void shouldBuildSimpleGlobalTableTopology() throws Exception {
        this.builder.globalTable("table", "globalTable");
        List globalStateStores = this.builder.buildGlobalStateTopology().globalStateStores();
        Assert.assertEquals(1L, globalStateStores.size());
        Assert.assertEquals("globalTable", ((StateStore) globalStateStores.get(0)).name());
    }

    private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
        ProcessorTopology buildGlobalStateTopology = this.builder.buildGlobalStateTopology();
        List globalStateStores = buildGlobalStateTopology.globalStateStores();
        Assert.assertEquals(Utils.mkSet(new String[]{"table", "table2"}), buildGlobalStateTopology.sourceTopics());
        Assert.assertEquals(2L, globalStateStores.size());
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
        this.builder.globalTable("table", "globalTable");
        this.builder.globalTable("table2", "globalTable2");
        doBuildGlobalTopologyWithAllGlobalTables();
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
        this.builder.globalTable("table");
        this.builder.globalTable("table2");
        doBuildGlobalTopologyWithAllGlobalTables();
    }

    @Test
    public void shouldAddGlobalTablesToEachGroup() throws Exception {
        GlobalKTable globalTable = this.builder.globalTable("table", "globalTable");
        GlobalKTable globalTable2 = this.builder.globalTable("table2", "globalTable2");
        this.builder.table("not-global", "not-global");
        KeyValueMapper<String, String, String> keyValueMapper = new KeyValueMapper<String, String, String>() { // from class: org.apache.kafka.streams.kstream.KStreamBuilderTest.4
            public String apply(String str, String str2) {
                return str2;
            }
        };
        this.builder.stream(new String[]{"t1"}).leftJoin(globalTable, keyValueMapper, MockValueJoiner.TOSTRING_JOINER);
        this.builder.stream(new String[]{"t2"}).leftJoin(globalTable2, keyValueMapper, MockValueJoiner.TOSTRING_JOINER);
        Iterator it = this.builder.nodeGroups().keySet().iterator();
        while (it.hasNext()) {
            List globalStateStores = this.builder.build((Integer) it.next()).globalStateStores();
            HashSet hashSet = new HashSet();
            Iterator it2 = globalStateStores.iterator();
            while (it2.hasNext()) {
                hashSet.add(((StateStore) it2.next()).name());
            }
            Assert.assertEquals(2L, globalStateStores.size());
            Assert.assertTrue(hashSet.contains("globalTable"));
            Assert.assertTrue(hashSet.contains("globalTable2"));
        }
    }

    @Test
    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
        KStream stream = this.builder.stream(new String[]{"events"});
        KTable table = this.builder.table("table-topic", "table-store");
        Assert.assertEquals(Collections.singletonList("table-topic"), this.builder.stateStoreNameToSourceTopics().get("table-store"));
        stream.map(MockKeyValueMapper.SelectValueKeyValueMapper()).leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
        Assert.assertEquals(Collections.singletonList("table-topic"), this.builder.stateStoreNameToSourceTopics().get("table-store"));
        Assert.assertEquals(Collections.singletonList("app-id-KSTREAM-MAP-0000000003-repartition"), this.builder.stateStoreNameToSourceTopics().get("count"));
    }

    @Test
    public void shouldAddTopicToEarliestAutoOffsetResetList() {
        this.builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, new String[]{"topic-1"});
        Assert.assertTrue(this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse(this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldAddTopicToLatestAutoOffsetResetList() {
        this.builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, new String[]{"topic-1"});
        Assert.assertTrue(this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse(this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldAddTableToEarliestAutoOffsetResetList() {
        this.builder.table(TopologyBuilder.AutoOffsetReset.EARLIEST, "topic-1", "test-store");
        Assert.assertTrue(this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse(this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldAddTableToLatestAutoOffsetResetList() {
        this.builder.table(TopologyBuilder.AutoOffsetReset.LATEST, "topic-1", "test-store");
        Assert.assertTrue(this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse(this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldNotAddTableToOffsetResetLists() {
        Serde String = Serdes.String();
        this.builder.table(String, String, "topic-1", "test-store");
        Assert.assertFalse(this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse(this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldNotAddRegexTopicsToOffsetResetLists() {
        this.builder.stream(Pattern.compile("topic-\\d"));
        Assert.assertFalse(this.builder.latestResetTopicsPattern().matcher("topic-5").matches());
        Assert.assertFalse(this.builder.earliestResetTopicsPattern().matcher("topic-5").matches());
    }

    @Test
    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
        this.builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d+"));
        Assert.assertTrue(this.builder.earliestResetTopicsPattern().matcher("topic-500000").matches());
        Assert.assertFalse(this.builder.latestResetTopicsPattern().matcher("topic-500000").matches());
    }

    @Test
    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
        this.builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d+"));
        Assert.assertTrue(this.builder.latestResetTopicsPattern().matcher("topic-1000000").matches());
        Assert.assertFalse(this.builder.earliestResetTopicsPattern().matcher("topic-1000000").matches());
    }

    @Test
    public void kStreamTimestampExtractorShouldBeNull() throws Exception {
        this.builder.stream(new String[]{"topic"});
        Assert.assertNull(this.builder.build((Integer) null).source("topic").getTimestampExtractor());
    }

    @Test
    public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
        this.builder.stream(new MockTimestampExtractor(), (Serde) null, (Serde) null, new String[]{"topic"});
        Iterator it = this.builder.build((Integer) null).sources().iterator();
        while (it.hasNext()) {
            Assert.assertThat(((SourceNode) it.next()).getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
        }
    }

    @Test
    public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
        this.builder.stream((TopologyBuilder.AutoOffsetReset) null, new MockTimestampExtractor(), (Serde) null, (Serde) null, new String[]{"topic"});
        Assert.assertThat(this.builder.build((Integer) null).source("topic").getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorToTablePerSource() throws Exception {
        this.builder.table("topic", "store");
        Assert.assertNull(this.builder.build((Integer) null).source("topic").getTimestampExtractor());
    }

    @Test
    public void kTableTimestampExtractorShouldBeNull() throws Exception {
        this.builder.table("topic", "store");
        Assert.assertNull(this.builder.build((Integer) null).source("topic").getTimestampExtractor());
    }

    @Test
    public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception {
        this.builder.table((TopologyBuilder.AutoOffsetReset) null, new MockTimestampExtractor(), (Serde) null, (Serde) null, "topic", "store");
        Assert.assertThat(this.builder.build((Integer) null).source("topic").getTimestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }
}
