/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KStreamBuilderTest {
    private static final String APP_ID = "app-id";
    private final KStreamBuilder builder = new KStreamBuilder();
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void setUp() {
        this.builder.setApplicationId(APP_ID);
    }

    @Test(expected=TopologyBuilderException.class)
    public void testFrom() {
        this.builder.stream(new String[]{"topic-1", "topic-2"});
        this.builder.addSource("KSTREAM-SOURCE-0000000000", new String[]{"topic-3"});
    }

    @Test
    public void testNewName() {
        Assert.assertEquals((Object)"X-0000000000", (Object)this.builder.newName("X-"));
        Assert.assertEquals((Object)"Y-0000000001", (Object)this.builder.newName("Y-"));
        Assert.assertEquals((Object)"Z-0000000002", (Object)this.builder.newName("Z-"));
        KStreamBuilder newBuilder = new KStreamBuilder();
        Assert.assertEquals((Object)"X-0000000000", (Object)newBuilder.newName("X-"));
        Assert.assertEquals((Object)"Y-0000000001", (Object)newBuilder.newName("Y-"));
        Assert.assertEquals((Object)"Z-0000000002", (Object)newBuilder.newName("Z-"));
    }

    @Test
    public void shouldProcessFromSinkTopic() {
        KStream source = this.builder.stream(new String[]{"topic-source"});
        source.to("topic-sink");
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        source.process(processorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa"}), processorSupplier.processed);
    }

    @Test
    public void shouldProcessViaThroughTopic() {
        KStream source = this.builder.stream(new String[]{"topic-source"});
        KStream through = source.through("topic-sink");
        MockProcessorSupplier sourceProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier throughProcessorSupplier = new MockProcessorSupplier();
        source.process(sourceProcessorSupplier, new String[0]);
        through.process(throughProcessorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa"}), sourceProcessorSupplier.processed);
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa"}), throughProcessorSupplier.processed);
    }

    @Test
    public void testNewStoreName() {
        Assert.assertEquals((Object)"X-STATE-STORE-0000000000", (Object)this.builder.newStoreName("X-"));
        Assert.assertEquals((Object)"Y-STATE-STORE-0000000001", (Object)this.builder.newStoreName("Y-"));
        Assert.assertEquals((Object)"Z-STATE-STORE-0000000002", (Object)this.builder.newStoreName("Z-"));
        KStreamBuilder newBuilder = new KStreamBuilder();
        Assert.assertEquals((Object)"X-STATE-STORE-0000000000", (Object)newBuilder.newStoreName("X-"));
        Assert.assertEquals((Object)"Y-STATE-STORE-0000000001", (Object)newBuilder.newStoreName("Y-"));
        Assert.assertEquals((Object)"Z-STATE-STORE-0000000002", (Object)newBuilder.newStoreName("Z-"));
    }

    @Test
    public void testMerge() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream(new String[]{"topic-1"});
        KStream source2 = this.builder.stream(new String[]{"topic-2"});
        KStream merged = this.builder.merge(new KStream[]{source1, source2});
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-1", "A", "aa");
        this.driver.process("topic-2", "B", "bb");
        this.driver.process("topic-2", "C", "cc");
        this.driver.process("topic-1", "D", "dd");
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), processorSupplier.processed);
    }

    @Test
    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        String topic3 = "topic-3";
        KStream source1 = this.builder.stream(new String[]{"topic-1"});
        KStream source2 = this.builder.stream(new String[]{"topic-2"});
        KStream source3 = this.builder.stream(new String[]{"topic-3"});
        KStream processedSource1 = source1.mapValues((ValueMapper)new ValueMapper<String, String>(){

            public String apply(String value) {
                return value;
            }
        }).filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return true;
            }
        });
        KStream processedSource2 = source2.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return true;
            }
        });
        KStream merged = processedSource1.merge(processedSource2).merge(source3);
        merged.groupByKey().count("my-table");
        Map actual = this.builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"topic-1", "topic-2", "topic-3"}), actual.get("my-table"));
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() {
        this.builder.stream(new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() {
        this.builder.stream(Serdes.String(), Serdes.String(), new String[]{null, null});
    }

    @Test
    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() {
        KTable table1 = this.builder.table("topic1", "table1");
        KTable table2 = this.builder.table("topic2", (String)null);
        ProcessorTopology topology = this.builder.build(null);
        Assert.assertEquals((long)2L, (long)topology.stateStores().size());
        Assert.assertEquals((Object)"table1", (Object)((StateStore)topology.stateStores().get(0)).name());
        String internalStoreName = ((StateStore)topology.stateStores().get(1)).name();
        Assert.assertTrue((boolean)internalStoreName.contains("STATE-STORE-"));
        Assert.assertEquals((long)2L, (long)topology.storeToChangelogTopic().size());
        Assert.assertEquals((Object)"topic1", topology.storeToChangelogTopic().get("table1"));
        Assert.assertEquals((Object)"topic2", topology.storeToChangelogTopic().get(internalStoreName));
        Assert.assertEquals((Object)table1.queryableStoreName(), (Object)"table1");
        Assert.assertNull((Object)table2.queryableStoreName());
    }

    @Test
    public void shouldBuildSimpleGlobalTableTopology() {
        this.builder.globalTable("table", "globalTable");
        ProcessorTopology topology = this.builder.buildGlobalStateTopology();
        List stateStores = topology.globalStateStores();
        Assert.assertEquals((long)1L, (long)stateStores.size());
        Assert.assertEquals((Object)"globalTable", (Object)((StateStore)stateStores.get(0)).name());
    }

    private void doBuildGlobalTopologyWithAllGlobalTables() {
        ProcessorTopology topology = this.builder.buildGlobalStateTopology();
        List stateStores = topology.globalStateStores();
        Set sourceTopics = topology.sourceTopics();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"table", "table2"}), (Object)sourceTopics);
        Assert.assertEquals((long)2L, (long)stateStores.size());
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTables() {
        this.builder.globalTable("table", "globalTable");
        this.builder.globalTable("table2", "globalTable2");
        this.doBuildGlobalTopologyWithAllGlobalTables();
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() {
        this.builder.globalTable("table");
        this.builder.globalTable("table2");
        this.doBuildGlobalTopologyWithAllGlobalTables();
    }

    @Test
    public void shouldAddGlobalTablesToEachGroup() {
        String one = "globalTable";
        String two = "globalTable2";
        GlobalKTable globalTable = this.builder.globalTable("table", "globalTable");
        GlobalKTable globalTable2 = this.builder.globalTable("table2", "globalTable2");
        this.builder.table("not-global", "not-global");
        KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>(){

            public String apply(String key, String value) {
                return value;
            }
        };
        KStream stream = this.builder.stream(new String[]{"t1"});
        stream.leftJoin(globalTable, (KeyValueMapper)kvMapper, MockValueJoiner.TOSTRING_JOINER);
        KStream stream2 = this.builder.stream(new String[]{"t2"});
        stream2.leftJoin(globalTable2, (KeyValueMapper)kvMapper, MockValueJoiner.TOSTRING_JOINER);
        Map nodeGroups = this.builder.nodeGroups();
        for (Integer groupId : nodeGroups.keySet()) {
            ProcessorTopology topology = this.builder.build(groupId);
            List stateStores = topology.globalStateStores();
            HashSet<String> names = new HashSet<String>();
            for (StateStore stateStore : stateStores) {
                names.add(stateStore.name());
            }
            Assert.assertEquals((long)2L, (long)stateStores.size());
            Assert.assertTrue((boolean)names.contains("globalTable"));
            Assert.assertTrue((boolean)names.contains("globalTable2"));
        }
    }

    @Test
    public void shouldMapStateStoresToCorrectSourceTopics() {
        KStream playEvents = this.builder.stream(new String[]{"events"});
        KTable table = this.builder.table("table-topic", "table-store");
        Assert.assertEquals(Collections.singletonList("table-topic"), this.builder.stateStoreNameToSourceTopics().get("table-store"));
        KStream mapped = playEvents.map(MockMapper.selectValueKeyValueMapper());
        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
        Assert.assertEquals(Collections.singletonList("table-topic"), this.builder.stateStoreNameToSourceTopics().get("table-store"));
        Assert.assertEquals(Collections.singletonList("app-id-KSTREAM-MAP-0000000003-repartition"), this.builder.stateStoreNameToSourceTopics().get("count"));
    }

    @Test
    public void shouldAddTopicToEarliestAutoOffsetResetList() {
        String topicName = "topic-1";
        this.builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, new String[]{"topic-1"});
        Assert.assertTrue((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse((boolean)this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldAddTopicToLatestAutoOffsetResetList() {
        String topicName = "topic-1";
        this.builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, new String[]{"topic-1"});
        Assert.assertTrue((boolean)this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldAddTableToEarliestAutoOffsetResetList() {
        String topicName = "topic-1";
        String storeName = "test-store";
        this.builder.table(TopologyBuilder.AutoOffsetReset.EARLIEST, "topic-1", "test-store");
        Assert.assertTrue((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse((boolean)this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldAddTableToLatestAutoOffsetResetList() {
        String topicName = "topic-1";
        String storeName = "test-store";
        this.builder.table(TopologyBuilder.AutoOffsetReset.LATEST, "topic-1", "test-store");
        Assert.assertTrue((boolean)this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldNotAddTableToOffsetResetLists() {
        String topicName = "topic-1";
        String storeName = "test-store";
        Serde stringSerde = Serdes.String();
        this.builder.table(stringSerde, stringSerde, "topic-1", "test-store");
        Assert.assertFalse((boolean)this.builder.latestResetTopicsPattern().matcher("topic-1").matches());
        Assert.assertFalse((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-1").matches());
    }

    @Test
    public void shouldNotAddRegexTopicsToOffsetResetLists() {
        Pattern topicPattern = Pattern.compile("topic-\\d");
        String topic = "topic-5";
        this.builder.stream(topicPattern);
        Assert.assertFalse((boolean)this.builder.latestResetTopicsPattern().matcher("topic-5").matches());
        Assert.assertFalse((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-5").matches());
    }

    @Test
    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
        Pattern topicPattern = Pattern.compile("topic-\\d+");
        String topicTwo = "topic-500000";
        this.builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicPattern);
        Assert.assertTrue((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-500000").matches());
        Assert.assertFalse((boolean)this.builder.latestResetTopicsPattern().matcher("topic-500000").matches());
    }

    @Test
    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
        Pattern topicPattern = Pattern.compile("topic-\\d+");
        String topicTwo = "topic-1000000";
        this.builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicPattern);
        Assert.assertTrue((boolean)this.builder.latestResetTopicsPattern().matcher("topic-1000000").matches());
        Assert.assertFalse((boolean)this.builder.earliestResetTopicsPattern().matcher("topic-1000000").matches());
    }

    @Test
    public void kStreamTimestampExtractorShouldBeNull() {
        this.builder.stream(new String[]{"topic"});
        ProcessorTopology processorTopology = this.builder.build(null);
        Assert.assertNull((Object)processorTopology.source("topic").getTimestampExtractor());
    }

    @Test
    public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() {
        this.builder.stream((TimestampExtractor)new MockTimestampExtractor(), null, null, new String[]{"topic"});
        ProcessorTopology processorTopology = this.builder.build(null);
        for (SourceNode sourceNode : processorTopology.sources()) {
            Assert.assertThat((Object)sourceNode.getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
        }
    }

    @Test
    public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() {
        this.builder.stream(null, (TimestampExtractor)new MockTimestampExtractor(), null, null, new String[]{"topic"});
        ProcessorTopology processorTopology = this.builder.build(null);
        Assert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorToTablePerSource() {
        this.builder.table("topic", "store");
        ProcessorTopology processorTopology = this.builder.build(null);
        Assert.assertNull((Object)processorTopology.source("topic").getTimestampExtractor());
    }

    @Test
    public void kTableTimestampExtractorShouldBeNull() {
        this.builder.table("topic", "store");
        ProcessorTopology processorTopology = this.builder.build(null);
        Assert.assertNull((Object)processorTopology.source("topic").getTimestampExtractor());
    }

    @Test
    public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() {
        this.builder.table(null, (TimestampExtractor)new MockTimestampExtractor(), null, null, "topic", "store");
        ProcessorTopology processorTopology = this.builder.build(null);
        Assert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }
}

