package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.IsNull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImplTest.class */
public class KStreamImplTest {
    private StreamsBuilder builder;
    private KStream<String, String> testStream;
    private KTable<String, String> testTable;
    private GlobalKTable<String, String> testGlobalTable;
    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
    private final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
    private final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier = () -> {
        return new Transformer<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.1
            public void init(ProcessorContext processorContext) {
            }

            public KeyValue<String, String> transform(String str, String str2) {
                return new KeyValue<>(str, str2);
            }

            public void close() {
            }
        };
    };
    private final TransformerSupplier<String, String, Iterable<KeyValue<String, String>>> flatTransformerSupplier = () -> {
        return new Transformer<String, String, Iterable<KeyValue<String, String>>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.2
            public void init(ProcessorContext processorContext) {
            }

            public Iterable<KeyValue<String, String>> transform(String str, String str2) {
                return Collections.singleton(new KeyValue(str, str2));
            }

            public void close() {
            }
        };
    };
    private final ValueTransformerSupplier<String, String> valueTransformerSupplier = () -> {
        return new ValueTransformer<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.3
            public void init(ProcessorContext processorContext) {
            }

            public String transform(String str) {
                return str;
            }

            public void close() {
            }
        };
    };
    private final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier = () -> {
        return new ValueTransformerWithKey<String, String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.4
            public void init(ProcessorContext processorContext) {
            }

            public String transform(String str, String str2) {
                return str2;
            }

            public void close() {
            }
        };
    };
    private final ValueTransformerSupplier<String, Iterable<String>> flatValueTransformerSupplier = () -> {
        return new ValueTransformer<String, Iterable<String>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.5
            public void init(ProcessorContext processorContext) {
            }

            public Iterable<String> transform(String str) {
                return Collections.singleton(str);
            }

            public void close() {
            }
        };
    };
    private final ValueTransformerWithKeySupplier<String, String, Iterable<String>> flatValueTransformerWithKeySupplier = () -> {
        return new ValueTransformerWithKey<String, String, Iterable<String>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.6
            public void init(ProcessorContext processorContext) {
            }

            public Iterable<String> transform(String str, String str2) {
                return Collections.singleton(str2);
            }

            public void close() {
            }
        };
    };
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private final Serde<String> mySerde = new Serdes.StringSerde();

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.testStream = this.builder.stream("source");
        this.testTable = this.builder.table("topic");
        this.testGlobalTable = this.builder.globalTable("global");
    }

    @Test
    public void shouldNotAllowNullPredicateOnFilter() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.filter((Predicate) null);
        })).getMessage(), CoreMatchers.equalTo("predicate can't be null"));
    }

    @Test
    public void shouldNotAllowNullPredicateOnFilterWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.filter((Predicate) null, Named.as("filter"));
        })).getMessage(), CoreMatchers.equalTo("predicate can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFilter() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.filter((str, str2) -> {
                return true;
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullPredicateOnFilterNot() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.filterNot((Predicate) null);
        })).getMessage(), CoreMatchers.equalTo("predicate can't be null"));
    }

    @Test
    public void shouldNotAllowNullPredicateOnFilterNotWithName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.filterNot((Predicate) null, Named.as("filter"));
        })).getMessage(), CoreMatchers.equalTo("predicate can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFilterNot() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.filterNot((str, str2) -> {
                return true;
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnSelectKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.selectKey((KeyValueMapper) null);
        })).getMessage(), CoreMatchers.equalTo("mapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnSelectKeyWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.selectKey((KeyValueMapper) null, Named.as("keySelector"));
        })).getMessage(), CoreMatchers.equalTo("mapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnSelectKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.selectKey((str, str2) -> {
                return str;
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnMap() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.map((KeyValueMapper) null);
        })).getMessage(), CoreMatchers.equalTo("mapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnMapWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.map((KeyValueMapper) null, Named.as("map"));
        })).getMessage(), CoreMatchers.equalTo("mapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnMap() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.map((v0, v1) -> {
                return KeyValue.pair(v0, v1);
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnMapValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.mapValues((ValueMapper) null);
        })).getMessage(), CoreMatchers.equalTo("valueMapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnMapValuesWithKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.mapValues((ValueMapperWithKey) null);
        })).getMessage(), CoreMatchers.equalTo("valueMapperWithKey can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnMapValuesWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.mapValues((ValueMapper) null, Named.as("valueMapper"));
        })).getMessage(), CoreMatchers.equalTo("valueMapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnMapValuesWithKeyWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.mapValues((ValueMapperWithKey) null, Named.as("valueMapperWithKey"));
        })).getMessage(), CoreMatchers.equalTo("valueMapperWithKey can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnMapValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.mapValues(str -> {
                return str;
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnMapValuesWithKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.mapValues((str, str2) -> {
                return str2;
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnFlatMap() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMap((KeyValueMapper) null);
        })).getMessage(), CoreMatchers.equalTo("mapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnFlatMapWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMap((KeyValueMapper) null, Named.as("flatMapper"));
        })).getMessage(), CoreMatchers.equalTo("mapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatMap() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMap((str, str2) -> {
                return Collections.singleton(new KeyValue(str, str2));
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnFlatMapValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMapValues((ValueMapper) null);
        })).getMessage(), CoreMatchers.equalTo("valueMapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMapValues((ValueMapperWithKey) null);
        })).getMessage(), CoreMatchers.equalTo("valueMapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnFlatMapValuesWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMapValues((ValueMapper) null, Named.as("flatValueMapper"));
        })).getMessage(), CoreMatchers.equalTo("valueMapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnFlatMapValuesWithKeyWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMapValues((ValueMapperWithKey) null, Named.as("flatValueMapperWithKey"));
        })).getMessage(), CoreMatchers.equalTo("valueMapper can't be null"));
    }

    @Test
    public void shouldNotAllowNullNameOnFlatMapValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMapValues(str -> {
                return Collections.emptyList();
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNameOnFlatMapValuesWithKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatMapValues((str, str2) -> {
                return Collections.emptyList();
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullPrintedOnPrint() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.print((Printed) null);
        })).getMessage(), CoreMatchers.equalTo("printed can't be null"));
    }

    @Test
    public void shouldNotAllowNullActionOnForEach() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.foreach((ForeachAction) null);
        })).getMessage(), CoreMatchers.equalTo("action can't be null"));
    }

    @Test
    public void shouldNotAllowNullActionOnForEachWithName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.foreach((ForeachAction) null, Named.as("foreach"));
        })).getMessage(), CoreMatchers.equalTo("action can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnForEach() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.foreach((str, str2) -> {
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullActionOnPeek() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.peek((ForeachAction) null);
        })).getMessage(), CoreMatchers.equalTo("action can't be null"));
    }

    @Test
    public void shouldNotAllowNullActionOnPeekWithName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.peek((ForeachAction) null, Named.as("peek"));
        })).getMessage(), CoreMatchers.equalTo("action can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnPeek() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.peek((str, str2) -> {
            }, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullPredicatedOnBranch() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.branch((Predicate[]) null);
        })).getMessage(), CoreMatchers.equalTo("predicates can't be a null array"));
    }

    @Test
    public void shouldHaveAtLeastOnPredicateWhenBranching() {
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.branch(new Predicate[0]);
        })).getMessage(), CoreMatchers.equalTo("branch() requires at least one predicate"));
    }

    @Test
    public void shouldHaveAtLeastOnPredicateWhenBranchingWithNamed() {
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.branch(Named.as("branch"), new Predicate[0]);
        })).getMessage(), CoreMatchers.equalTo("branch() requires at least one predicate"));
    }

    @Test
    public void shouldNotAllowNullNamedOnBranch() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.branch((Named) null, new Predicate[]{(str, str2) -> {
                return true;
            }});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldCantHaveNullPredicate() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.branch(new Predicate[]{(Predicate) null});
        })).getMessage(), CoreMatchers.equalTo("predicates can't be null"));
    }

    @Test
    public void shouldCantHaveNullPredicateWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.branch(Named.as("branch"), new Predicate[]{(Predicate) null});
        })).getMessage(), CoreMatchers.equalTo("predicates can't be null"));
    }

    @Test
    public void shouldNotAllowNullKStreamOnMerge() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.merge((KStream) null);
        })).getMessage(), CoreMatchers.equalTo("stream can't be null"));
    }

    @Test
    public void shouldNotAllowNullKStreamOnMergeWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.merge((KStream) null, Named.as("merge"));
        })).getMessage(), CoreMatchers.equalTo("stream can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnMerge() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.merge(this.testStream, (Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    @Deprecated
    public void shouldNotAllowNullTopicOnThrough() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.through((String) null);
        })).getMessage(), CoreMatchers.equalTo("topic can't be null"));
    }

    @Test
    @Deprecated
    public void shouldNotAllowNullTopicOnThroughWithProduced() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.through((String) null, Produced.as("through"));
        })).getMessage(), CoreMatchers.equalTo("topic can't be null"));
    }

    @Test
    @Deprecated
    public void shouldNotAllowNullProducedOnThrough() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.through("topic", (Produced) null);
        })).getMessage(), CoreMatchers.equalTo("produced can't be null"));
    }

    @Test
    public void shouldNotAllowNullTopicOnTo() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.to((String) null);
        })).getMessage(), CoreMatchers.equalTo("topic can't be null"));
    }

    @Test
    public void shouldNotAllowNullRepartitionedOnRepartition() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.repartition((Repartitioned) null);
        })).getMessage(), CoreMatchers.equalTo("repartitioned can't be null"));
    }

    @Test
    public void shouldNotAllowNullTopicChooserOnTo() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.to((TopicNameExtractor) null);
        })).getMessage(), CoreMatchers.equalTo("topicExtractor can't be null"));
    }

    @Test
    public void shouldNotAllowNullTopicOnToWithProduced() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.to((String) null, Produced.as("to"));
        })).getMessage(), CoreMatchers.equalTo("topic can't be null"));
    }

    @Test
    public void shouldNotAllowNullTopicChooserOnToWithProduced() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.to((TopicNameExtractor) null, Produced.as("to"));
        })).getMessage(), CoreMatchers.equalTo("topicExtractor can't be null"));
    }

    @Test
    public void shouldNotAllowNullProducedOnToWithTopicName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.to("topic", (Produced) null);
        })).getMessage(), CoreMatchers.equalTo("produced can't be null"));
    }

    @Test
    public void shouldNotAllowNullProducedOnToWithTopicChooser() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.to((str, str2, recordContext) -> {
                return "topic";
            }, (Produced) null);
        })).getMessage(), CoreMatchers.equalTo("produced can't be null"));
    }

    @Test
    public void shouldNotAllowNullSelectorOnGroupBy() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.groupBy((KeyValueMapper) null);
        })).getMessage(), CoreMatchers.equalTo("keySelector can't be null"));
    }

    @Test
    public void shouldNotAllowNullSelectorOnGroupByWithGrouped() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.groupBy((KeyValueMapper) null, Grouped.as("name"));
        })).getMessage(), CoreMatchers.equalTo("keySelector can't be null"));
    }

    @Test
    public void shouldNotAllowNullGroupedOnGroupBy() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.groupBy((str, str2) -> {
                return str;
            }, (Grouped) null);
        })).getMessage(), CoreMatchers.equalTo("grouped can't be null"));
    }

    @Test
    public void shouldNotAllowNullGroupedOnGroupByKey() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.groupByKey((Grouped) null);
        })).getMessage(), CoreMatchers.equalTo("grouped can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnToTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.toTable((Named) null);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMaterializedOnToTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.toTable((Materialized) null);
        })).getMessage(), CoreMatchers.equalTo("materialized can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnToTableWithMaterialized() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.toTable((Named) null, Materialized.with((Serde) null, (Serde) null));
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullMaterializedOnToTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.toTable(Named.as("name"), (Materialized) null);
        })).getMessage(), CoreMatchers.equalTo("materialized can't be null"));
    }

    @Test
    public void shouldNotAllowNullOtherStreamOnJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("otherStream can't be null"));
    }

    @Test
    public void shouldNotAllowNullOtherStreamOnJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("otherStream can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, (ValueJoiner) null, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, (ValueJoinerWithKey) null, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, (ValueJoiner) null, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, (ValueJoinerWithKey) null, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinWindowsOnJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null);
        })).getMessage(), CoreMatchers.equalTo("windows can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinWindowsOnJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null, StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("windows can't be null"));
    }

    @Test
    public void shouldNotAllowNullStreamJoinedOnJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)), (StreamJoined) null);
        })).getMessage(), CoreMatchers.equalTo("streamJoined can't be null"));
    }

    @Test
    public void shouldNotAllowNullOtherStreamOnLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("otherStream can't be null"));
    }

    @Test
    public void shouldNotAllowNullOtherStreamOnLeftJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("otherStream can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, (ValueJoiner) null, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, (ValueJoinerWithKey) null, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnLeftJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, (ValueJoiner) null, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, (ValueJoinerWithKey) null, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinWindowsOnLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null);
        })).getMessage(), CoreMatchers.equalTo("windows can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinWindowsOnLeftJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null, StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("windows can't be null"));
    }

    @Test
    public void shouldNotAllowNullStreamJoinedOnLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)), (StreamJoined) null);
        })).getMessage(), CoreMatchers.equalTo("streamJoined can't be null"));
    }

    @Test
    public void shouldNotAllowNullOtherStreamOnOuterJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("otherStream can't be null"));
    }

    @Test
    public void shouldNotAllowNullOtherStreamOnOuterJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("otherStream can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnOuterJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, (ValueJoiner) null, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, (ValueJoinerWithKey) null, JoinWindows.of(Duration.ofMillis(10L)));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnOuterJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, (ValueJoiner) null, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, (ValueJoinerWithKey) null, JoinWindows.of(Duration.ofMillis(10L)), StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinWindowsOnOuterJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null);
        })).getMessage(), CoreMatchers.equalTo("windows can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinWindowsOnOuterJoinWithStreamJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null, StreamJoined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("windows can't be null"));
    }

    @Test
    public void shouldNotAllowNullStreamJoinedOnOuterJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(10L)), (StreamJoined) null);
        })).getMessage(), CoreMatchers.equalTo("streamJoined can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnTableJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join((KTable) null, MockValueJoiner.TOSTRING_JOINER);
        })).getMessage(), CoreMatchers.equalTo("table can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnTableJoinWithJoiner() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join((KTable) null, MockValueJoiner.TOSTRING_JOINER, Joined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("table can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnTableJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testTable, (ValueJoiner) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnTableJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testTable, (ValueJoinerWithKey) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnTableJoinWithJoiner() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testTable, (ValueJoiner) null, Joined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnTableJoinWithJoiner() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testTable, (ValueJoinerWithKey) null, Joined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinedOnTableJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testTable, MockValueJoiner.TOSTRING_JOINER, (Joined) null);
        })).getMessage(), CoreMatchers.equalTo("joined can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnTableLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
        })).getMessage(), CoreMatchers.equalTo("table can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnTableLeftJoinWithJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER, Joined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("table can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnTableLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testTable, (ValueJoiner) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnTableLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testTable, (ValueJoinerWithKey) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnTableLeftJoinWithJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testTable, (ValueJoiner) null, Joined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnTableLeftJoinWithJoined() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testTable, (ValueJoinerWithKey) null, Joined.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullJoinedOnTableLeftJoin() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testTable, MockValueJoiner.TOSTRING_JOINER, (Joined) null);
        })).getMessage(), CoreMatchers.equalTo("joined can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join((GlobalKTable) null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
        })).getMessage(), CoreMatchers.equalTo("globalTable can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join((GlobalKTable) null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("globalTable can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testGlobalTable, (KeyValueMapper) null, MockValueJoiner.TOSTRING_JOINER);
        })).getMessage(), CoreMatchers.equalTo("keySelector can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testGlobalTable, (KeyValueMapper) null, MockValueJoiner.TOSTRING_JOINER, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("keySelector can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.join(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnLeftJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin((GlobalKTable) null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
        })).getMessage(), CoreMatchers.equalTo("globalTable can't be null"));
    }

    @Test
    public void shouldNotAllowNullTableOnLeftJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin((GlobalKTable) null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("globalTable can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testGlobalTable, (KeyValueMapper) null, MockValueJoiner.TOSTRING_JOINER);
        })).getMessage(), CoreMatchers.equalTo("keySelector can't be null"));
    }

    @Test
    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testGlobalTable, (KeyValueMapper) null, MockValueJoiner.TOSTRING_JOINER, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("keySelector can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnLeftJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithGlobalTable() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey) null);
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerOnLeftJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner) null, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithGlobalTableWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.leftJoin(this.testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey) null, Named.as("name"));
        })).getMessage(), CoreMatchers.equalTo("joiner can't be null"));
    }

    @Test
    public void testNumProcesses() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = streamsBuilder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        KStream mapValues = stream.filter((str, str2) -> {
            return true;
        }).filterNot((str3, str4) -> {
            return false;
        }).mapValues(Integer::valueOf);
        KStream flatMapValues = stream2.flatMapValues(str5 -> {
            return Collections.singletonList(Integer.valueOf(str5));
        });
        KStream[] branch = mapValues.branch(new Predicate[]{(str6, num) -> {
            return num.intValue() % 2 == 0;
        }, (str7, num2) -> {
            return true;
        }});
        KStream[] branch2 = flatMapValues.branch(new Predicate[]{(str8, num3) -> {
            return num3.intValue() % 2 == 0;
        }, (str9, num4) -> {
            return true;
        }});
        StreamJoined with = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
        KStream join = branch[0].join(branch2[0], (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, JoinWindows.of(Duration.ofMillis(1L)), with);
        branch[1].join(branch2[1], (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, JoinWindows.of(Duration.ofMillis(1L)), with);
        join.to("topic-5");
        branch[1].through("topic-6").process(new MockProcessorSupplier(), new String[0]);
        branch[1].repartition().process(new MockProcessorSupplier(), new String[0]);
        Assert.assertEquals(30L, TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").buildTopology().processors().size());
    }

    @Test
    public void shouldPreserveSerdesForOperators() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Collections.singleton("topic-1"), this.stringConsumed);
        KTable table = streamsBuilder.table("topic-2", this.stringConsumed);
        GlobalKTable globalTable = streamsBuilder.globalTable("topic-2", this.stringConsumed);
        ConsumedInternal consumedInternal = new ConsumedInternal(this.stringConsumed);
        KeyValueMapper keyValueMapper = (str, str2) -> {
            return str;
        };
        KeyValueMapper keyValueMapper2 = (str3, str4) -> {
            return Collections.singleton(new KeyValue(str3, str4));
        };
        ValueMapper valueMapper = str5 -> {
            return str5;
        };
        ValueMapper valueMapper2 = (v0) -> {
            return Collections.singleton(v0);
        };
        ValueJoiner valueJoiner = (str6, str7) -> {
            return str6;
        };
        Assert.assertEquals(stream.filter((str8, str9) -> {
            return false;
        }).keySerde(), consumedInternal.keySerde());
        Assert.assertEquals(stream.filter((str10, str11) -> {
            return false;
        }).valueSerde(), consumedInternal.valueSerde());
        Assert.assertEquals(stream.filterNot((str12, str13) -> {
            return false;
        }).keySerde(), consumedInternal.keySerde());
        Assert.assertEquals(stream.filterNot((str14, str15) -> {
            return false;
        }).valueSerde(), consumedInternal.valueSerde());
        Assert.assertNull(stream.selectKey(keyValueMapper).keySerde());
        Assert.assertEquals(stream.selectKey(keyValueMapper).valueSerde(), consumedInternal.valueSerde());
        Assert.assertNull(stream.map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).keySerde());
        Assert.assertNull(stream.map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).valueSerde());
        Assert.assertEquals(stream.mapValues(valueMapper).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.mapValues(valueMapper).valueSerde());
        Assert.assertNull(stream.flatMap(keyValueMapper2).keySerde());
        Assert.assertNull(stream.flatMap(keyValueMapper2).valueSerde());
        Assert.assertEquals(stream.flatMapValues(valueMapper2).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.flatMapValues(valueMapper2).valueSerde());
        Assert.assertNull(stream.transform(this.transformerSupplier, new String[0]).keySerde());
        Assert.assertNull(stream.transform(this.transformerSupplier, new String[0]).valueSerde());
        Assert.assertEquals(stream.transformValues(this.valueTransformerSupplier, new String[0]).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.transformValues(this.valueTransformerSupplier, new String[0]).valueSerde());
        Assert.assertNull(stream.merge(stream).keySerde());
        Assert.assertNull(stream.merge(stream).valueSerde());
        Assert.assertEquals(stream.through("topic-3").keySerde(), consumedInternal.keySerde());
        Assert.assertEquals(stream.through("topic-3").valueSerde(), consumedInternal.valueSerde());
        Assert.assertEquals(stream.through("topic-3", Produced.with(this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertEquals(stream.through("topic-3", Produced.with(this.mySerde, this.mySerde)).valueSerde(), this.mySerde);
        Assert.assertEquals(stream.repartition().keySerde(), consumedInternal.keySerde());
        Assert.assertEquals(stream.repartition().valueSerde(), consumedInternal.valueSerde());
        Assert.assertEquals(stream.repartition(Repartitioned.with(this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertEquals(stream.repartition(Repartitioned.with(this.mySerde, this.mySerde)).valueSerde(), this.mySerde);
        Assert.assertEquals(stream.groupByKey().keySerde(), consumedInternal.keySerde());
        Assert.assertEquals(stream.groupByKey().valueSerde(), consumedInternal.valueSerde());
        Assert.assertEquals(stream.groupByKey(Grouped.with(this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertEquals(stream.groupByKey(Grouped.with(this.mySerde, this.mySerde)).valueSerde(), this.mySerde);
        Assert.assertNull(stream.groupBy(keyValueMapper).keySerde());
        Assert.assertEquals(stream.groupBy(keyValueMapper).valueSerde(), consumedInternal.valueSerde());
        Assert.assertEquals(stream.groupBy(keyValueMapper, Grouped.with(this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertEquals(stream.groupBy(keyValueMapper, Grouped.with(this.mySerde, this.mySerde)).valueSerde(), this.mySerde);
        Assert.assertNull(stream.join(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L))).keySerde());
        Assert.assertNull(stream.join(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L))).valueSerde());
        Assert.assertEquals(stream.join(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertNull(stream.join(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde)).valueSerde());
        Assert.assertNull(stream.leftJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L))).keySerde());
        Assert.assertNull(stream.leftJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L))).valueSerde());
        Assert.assertEquals(stream.leftJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertNull(stream.leftJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde)).valueSerde());
        Assert.assertNull(stream.outerJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L))).keySerde());
        Assert.assertNull(stream.outerJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L))).valueSerde());
        Assert.assertEquals(stream.outerJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertNull(stream.outerJoin(stream, valueJoiner, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(this.mySerde, this.mySerde, this.mySerde)).valueSerde());
        Assert.assertEquals(stream.join(table, valueJoiner).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.join(table, valueJoiner).valueSerde());
        Assert.assertEquals(stream.join(table, valueJoiner, Joined.with(this.mySerde, this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertNull(stream.join(table, valueJoiner, Joined.with(this.mySerde, this.mySerde, this.mySerde)).valueSerde());
        Assert.assertEquals(stream.leftJoin(table, valueJoiner).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.leftJoin(table, valueJoiner).valueSerde());
        Assert.assertEquals(stream.leftJoin(table, valueJoiner, Joined.with(this.mySerde, this.mySerde, this.mySerde)).keySerde(), this.mySerde);
        Assert.assertNull(stream.leftJoin(table, valueJoiner, Joined.with(this.mySerde, this.mySerde, this.mySerde)).valueSerde());
        Assert.assertEquals(stream.join(globalTable, keyValueMapper, valueJoiner).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.join(globalTable, keyValueMapper, valueJoiner).valueSerde());
        Assert.assertEquals(stream.leftJoin(globalTable, keyValueMapper, valueJoiner).keySerde(), consumedInternal.keySerde());
        Assert.assertNull(stream.leftJoin(globalTable, keyValueMapper, valueJoiner).valueSerde());
    }

    @Test
    @Deprecated
    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = streamsBuilder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        stream.to("topic-5");
        stream2.through("topic-6");
        ProcessorTopology buildTopology = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").buildTopology();
        MatcherAssert.assertThat(buildTopology.source("topic-6").getTimestampExtractor(), IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        Assert.assertNull(buildTopology.source("topic-4").getTimestampExtractor());
        Assert.assertNull(buildTopology.source("topic-3").getTimestampExtractor());
        Assert.assertNull(buildTopology.source("topic-2").getTimestampExtractor());
        Assert.assertNull(buildTopology.source("topic-1").getTimestampExtractor());
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = streamsBuilder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        stream.to("topic-5");
        stream2.repartition(Repartitioned.as("topic-6"));
        ProcessorTopology buildTopology = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").buildTopology();
        MatcherAssert.assertThat(buildTopology.source("X-topic-6-repartition").getTimestampExtractor(), IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        Assert.assertNull(buildTopology.source("topic-4").getTimestampExtractor());
        Assert.assertNull(buildTopology.source("topic-3").getTimestampExtractor());
        Assert.assertNull(buildTopology.source("topic-2").getTimestampExtractor());
        Assert.assertNull(buildTopology.source("topic-1").getTimestampExtractor());
    }

    @Test
    @Deprecated
    public void shouldSendDataThroughTopicUsingProduced() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("a", "b");
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            MatcherAssert.assertThat(this.processorSupplier.theCapturedProcessor().processed(), CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp("a", "b", 0L))));
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).repartition(Repartitioned.with(Serdes.String(), Serdes.String())).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("a", "b");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(this.processorSupplier.theCapturedProcessor().processed(), CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp("a", "b", 0L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSendDataToTopicUsingProduced() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).to("to-topic", Produced.with(Serdes.String(), Serdes.String()));
        streamsBuilder.stream("to-topic", this.stringConsumed).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("e", "f");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(this.processorSupplier.theCapturedProcessor().processed(), CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp("e", "f", 0L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSendDataToDynamicTopics() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).to((str, str2, recordContext) -> {
            return recordContext.topic() + "-" + str + "-" + str2.substring(0, 1);
        }, Produced.with(Serdes.String(), Serdes.String()));
        streamsBuilder.stream("topic-a-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        streamsBuilder.stream("topic-b-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                createInputTopic.pipeInput("a", "v1");
                createInputTopic.pipeInput("a", "v2");
                createInputTopic.pipeInput("b", "v1");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                List<MockApiProcessor<String, String, Void, Void>> capturedProcessors = this.processorSupplier.capturedProcessors(2);
                MatcherAssert.assertThat(capturedProcessors.get(0).processed(), CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp("a", "v1", 0L), new KeyValueTimestamp("a", "v2", 0L))));
                MatcherAssert.assertThat(capturedProcessors.get(1).processed(), CoreMatchers.equalTo(Collections.singletonList(new KeyValueTimestamp("b", "v1", 0L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("topic-1", this.stringConsumed);
        ValueJoiner instance = MockValueJoiner.instance(":");
        long convert = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        stream.map((str, str2) -> {
            return KeyValue.pair(str2, str2);
        }).join(stream, instance, JoinWindows.of(Duration.ofMillis(convert)).grace(Duration.ofMillis(3 * convert)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())).to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
        ProcessorTopology buildTopology = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").buildTopology();
        SourceNode source = buildTopology.source("topic-1");
        for (SourceNode sourceNode : buildTopology.sources()) {
            if (sourceNode.name().equals(source.name())) {
                Assert.assertNull(sourceNode.getTimestampExtractor());
            } else {
                MatcherAssert.assertThat(sourceNode.getTimestampExtractor(), IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
            }
        }
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("topic-1", this.stringConsumed);
        ValueJoiner instance = MockValueJoiner.instance(":");
        long convert = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        stream.map((str, str2) -> {
            return KeyValue.pair(str2, str2);
        }).join(stream, instance, JoinWindows.of(Duration.ofMillis(convert)).grace(Duration.ofMillis(3 * convert)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())).to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
        ProcessorTopology buildTopology = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").buildTopology();
        SourceNode source = buildTopology.source("topic-1");
        for (SourceNode sourceNode : buildTopology.sources()) {
            if (sourceNode.name().equals(source.name())) {
                Assert.assertNull(sourceNode.getTimestampExtractor());
            } else {
                MatcherAssert.assertThat(sourceNode.getTimestampExtractor(), IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
            }
        }
    }

    @Test
    public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic").selectKey((str, str2) -> {
            return str2;
        }).join(streamsBuilder.globalTable("globalTopic"), (str3, str4) -> {
            return str3 + str4;
        }, (str5, str6) -> {
            return str5 + str6;
        }).groupByKey().count();
        Matcher matcher = Pattern.compile("Sink: .*-repartition").matcher(streamsBuilder.build().describe().toString());
        Assert.assertTrue(matcher.find());
        String group = matcher.group();
        MatcherAssert.assertThat(group, IsNull.notNullValue());
        Assert.assertTrue(group.endsWith("repartition"));
    }

    @Test
    public void shouldMergeTwoStreams() {
        this.builder.stream("topic-1").merge(this.builder.stream("topic-2")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic-1", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic-2", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", "aa");
            createInputTopic2.pipeInput("B", "bb");
            createInputTopic2.pipeInput("C", "cc");
            createInputTopic.pipeInput("D", "dd");
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", "aa", 0L), new KeyValueTimestamp("B", "bb", 0L), new KeyValueTimestamp("C", "cc", 0L), new KeyValueTimestamp("D", "dd", 0L)), this.processorSupplier.theCapturedProcessor().processed());
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMergeMultipleStreams() {
        this.builder.stream("topic-1").merge(this.builder.stream("topic-2")).merge(this.builder.stream("topic-3")).merge(this.builder.stream("topic-4")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic-1", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic-2", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic3 = topologyTestDriver.createInputTopic("topic-3", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic4 = topologyTestDriver.createInputTopic("topic-4", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                createInputTopic.pipeInput("A", "aa", 1L);
                createInputTopic2.pipeInput("B", "bb", 9L);
                createInputTopic3.pipeInput("C", "cc", 2L);
                createInputTopic4.pipeInput("D", "dd", 8L);
                createInputTopic4.pipeInput("E", "ee", 3L);
                createInputTopic3.pipeInput("F", "ff", 7L);
                createInputTopic2.pipeInput("G", "gg", 4L);
                createInputTopic.pipeInput("H", "hh", 6L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", "aa", 1L), new KeyValueTimestamp("B", "bb", 9L), new KeyValueTimestamp("C", "cc", 2L), new KeyValueTimestamp("D", "dd", 8L), new KeyValueTimestamp("E", "ee", 3L), new KeyValueTimestamp("F", "ff", 7L), new KeyValueTimestamp("G", "gg", 4L), new KeyValueTimestamp("H", "hh", 6L)), this.processorSupplier.theCapturedProcessor().processed());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        this.builder.stream(Pattern.compile("topic-\\d")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic-3", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic-4", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic3 = topologyTestDriver.createInputTopic("topic-5", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic4 = topologyTestDriver.createInputTopic("topic-6", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic5 = topologyTestDriver.createInputTopic("topic-7", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", "aa", 1L);
            createInputTopic2.pipeInput("B", "bb", 5L);
            createInputTopic3.pipeInput("C", "cc", 10L);
            createInputTopic4.pipeInput("D", "dd", 8L);
            createInputTopic5.pipeInput("E", "ee", 3L);
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", "aa", 1L), new KeyValueTimestamp("B", "bb", 5L), new KeyValueTimestamp("C", "cc", 10L), new KeyValueTimestamp("D", "dd", 8L), new KeyValueTimestamp("E", "ee", 3L)), this.processorSupplier.theCapturedProcessor().processed());
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        this.builder.stream(Pattern.compile("topic-\\d")).merge(this.builder.stream(Pattern.compile("topic-[A-Z]"))).merge(this.builder.stream("topic-without-pattern")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic-3", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic-4", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic3 = topologyTestDriver.createInputTopic("topic-A", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic4 = topologyTestDriver.createInputTopic("topic-Z", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic5 = topologyTestDriver.createInputTopic("topic-without-pattern", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                createInputTopic.pipeInput("A", "aa", 1L);
                createInputTopic2.pipeInput("B", "bb", 5L);
                createInputTopic3.pipeInput("C", "cc", 10L);
                createInputTopic4.pipeInput("D", "dd", 8L);
                createInputTopic5.pipeInput("E", "ee", 3L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", "aa", 1L), new KeyValueTimestamp("B", "bb", 5L), new KeyValueTimestamp("C", "cc", 10L), new KeyValueTimestamp("D", "dd", 8L), new KeyValueTimestamp("E", "ee", 3L)), this.processorSupplier.theCapturedProcessor().processed());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform((TransformerSupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnTransformWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform((TransformerSupplier) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnTransformWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform((TransformerSupplier) null, Named.as("transformer"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnTransformWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform((TransformerSupplier) null, Named.as("transformer"), new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform(this.transformerSupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform(this.transformerSupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnTransformWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform(this.transformerSupplier, Named.as("transform"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnTransformWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform(this.transformerSupplier, Named.as("transform"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullNamedOnTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform(this.transformerSupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnTransformWithStoreName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transform(this.transformerSupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowBadTransformerSupplierOnFlatTransform() {
        Transformer transformer = this.flatTransformerSupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.flatTransform(() -> {
                return transformer;
            }, new String[0]);
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithStores() {
        Transformer transformer = this.flatTransformerSupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.flatTransform(() -> {
                return transformer;
            }, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamed() {
        Transformer transformer = this.flatTransformerSupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.flatTransform(() -> {
                return transformer;
            }, Named.as("flatTransformer"), new String[0]);
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamedAndStores() {
        Transformer transformer = this.flatTransformerSupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.flatTransform(() -> {
                return transformer;
            }, Named.as("flatTransformer"), new String[]{"storeName"});
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform((TransformerSupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnFlatTransformWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform((TransformerSupplier) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform((TransformerSupplier) null, Named.as("flatTransformer"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform((TransformerSupplier) null, Named.as("flatTransformer"), new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("transformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnFlatTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform(this.flatTransformerSupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnFlatTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform(this.flatTransformerSupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnFlatTransformWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform(this.flatTransformerSupplier, Named.as("flatTransform"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnFlatTransformWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform(this.flatTransformerSupplier, Named.as("flatTransform"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatTransform() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform(this.flatTransformerSupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatTransformWithStoreName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransform(this.flatTransformerSupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
        ValueTransformer valueTransformer = this.valueTransformerSupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.transformValues(() -> {
                return valueTransformer;
            }, new String[0]);
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowBadTransformerSupplierOnTransformValuesWithNamed() {
        ValueTransformer valueTransformer = this.valueTransformerSupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.transformValues(() -> {
                return valueTransformer;
            }, Named.as("transformer"), new String[0]);
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerSupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValues() {
        ValueTransformerWithKey valueTransformerWithKey = this.valueTransformerWithKeySupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.transformValues(() -> {
                return valueTransformerWithKey;
            }, new String[0]);
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValuesWithNamed() {
        ValueTransformerWithKey valueTransformerWithKey = this.valueTransformerWithKeySupplier.get();
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testStream.transformValues(() -> {
                return valueTransformerWithKey;
            }, Named.as("transformer"), new String[0]);
        })).getMessage(), CoreMatchers.containsString("#get() must return a new object each time it is called."));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerWithKeySupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerSupplier) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerWithKeySupplier) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerSupplier) null, Named.as("valueTransformer"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerWithKeySupplier) null, Named.as("valueTransformerWithKey"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerSupplier) null, Named.as("valueTransformer"), new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues((ValueTransformerWithKeySupplier) null, Named.as("valueTransformerWithKey"), new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerSupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerWithKeySupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerSupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerWithKeySupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplierWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerSupplier, Named.as("valueTransformer"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplierWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerWithKeySupplier, Named.as("valueTransformer"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplierWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerSupplier, Named.as("valueTransformer"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplierWithName() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerWithKeySupplier, Named.as("valueTransformerWithKey"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerSupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerWithKeySupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplierAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerSupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplierAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.transformValues(this.valueTransformerWithKeySupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerSupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerWithKeySupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerSupplier) null, new String[]{"stateStore"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerWithKeySupplier) null, new String[]{"stateStore"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerSupplier) null, Named.as("flatValueTransformer"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerWithKeySupplier) null, Named.as("flatValueWithKeyTransformer"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerSupplier) null, Named.as("flatValueTransformer"), new String[]{"stateStore"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues((ValueTransformerWithKeySupplier) null, Named.as("flatValueWitKeyTransformer"), new String[]{"stateStore"});
        })).getMessage(), CoreMatchers.equalTo("valueTransformerSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueSupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerSupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueWithKeySupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerWithKeySupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueSupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerSupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueWithKeySupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerWithKeySupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueSupplierAndNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerSupplier, Named.as("flatValueTransformer"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueWithKeySupplierAndNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerWithKeySupplier, Named.as("flatValueWitKeyTransformer"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueSupplierAndNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerSupplier, Named.as("flatValueTransformer"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueWithKeySupplierAndNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerWithKeySupplier, Named.as("flatValueWitKeyTransformer"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't contain `null` as store name"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerSupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueWithKeySupplier() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerWithKeySupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplierAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerSupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueWithKeySupplierAndStore() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.flatTransformValues(this.flatValueTransformerWithKeySupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullProcessSupplierOnProcess() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process((ProcessorSupplier) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("processorSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullProcessSupplierOnProcessWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process((ProcessorSupplier) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("processorSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullProcessSupplierOnProcessWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process((ProcessorSupplier) null, Named.as("processor"), new String[0]);
        })).getMessage(), CoreMatchers.equalTo("processorSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullProcessSupplierOnProcessWithNamedAndStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process((ProcessorSupplier) null, Named.as("processor"), new String[]{"stateStore"});
        })).getMessage(), CoreMatchers.equalTo("processorSupplier can't be null"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnProcess() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process(this.processorSupplier, (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnProcess() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process(this.processorSupplier, new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be null"));
    }

    @Test
    public void shouldNotAllowNullStoreNamesOnProcessWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process(this.processorSupplier, Named.as("processor"), (String[]) null);
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be a null array"));
    }

    @Test
    public void shouldNotAllowNullStoreNameOnProcessWithNamed() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process(this.processorSupplier, Named.as("processor"), new String[]{(String) null});
        })).getMessage(), CoreMatchers.equalTo("stateStoreNames can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnProcess() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process(this.processorSupplier, (Named) null, new String[0]);
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotAllowNullNamedOnProcessWithStores() {
        MatcherAssert.assertThat(((NullPointerException) Assert.assertThrows(NullPointerException.class, () -> {
            this.testStream.process(this.processorSupplier, (Named) null, new String[]{"storeName"});
        })).getMessage(), CoreMatchers.equalTo("named can't be null"));
    }

    @Test
    public void shouldNotMaterializedKTableFromKStream() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", with).toTable().toStream().to("output");
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-TOTABLE-0000000001\n    Processor: KSTREAM-TOTABLE-0000000001 (stores: [])\n      --> KTABLE-TOSTREAM-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KSTREAM-TOTABLE-0000000001\n    Sink: KSTREAM-SINK-0000000004 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000003\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", Serdes.String().deserializer(), Serdes.String().deserializer());
                createInputTopic.pipeInput("A", "01", 5L);
                createInputTopic.pipeInput("B", "02", 100L);
                createInputTopic.pipeInput("C", "03", 0L);
                createInputTopic.pipeInput("D", "04", 0L);
                createInputTopic.pipeInput("A", "05", 10L);
                createInputTopic.pipeInput("A", "06", 8L);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new TestRecord("A", "01", Instant.ofEpochMilli(5L)));
                arrayList.add(new TestRecord("B", "02", Instant.ofEpochMilli(100L)));
                arrayList.add(new TestRecord("C", "03", Instant.ofEpochMilli(0L)));
                arrayList.add(new TestRecord("D", "04", Instant.ofEpochMilli(0L)));
                arrayList.add(new TestRecord("A", "05", Instant.ofEpochMilli(10L)));
                arrayList.add(new TestRecord("A", "06", Instant.ofEpochMilli(8L)));
                Assert.assertEquals(createOutputTopic.readRecordsToList(), arrayList);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldMaterializeKTableFromKStream() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", with).toTable(Materialized.as(Stores.inMemoryKeyValueStore("store")));
        Topology build = streamsBuilder.build();
        MatcherAssert.assertThat(build.describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-TOTABLE-0000000001\n    Processor: KSTREAM-TOTABLE-0000000001 (stores: [store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
            createInputTopic.pipeInput("A", "01");
            createInputTopic.pipeInput("B", "02");
            createInputTopic.pipeInput("A", "03");
            MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("A", "03"), Utils.mkEntry("B", "02")})));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSupportKeyChangeKTableFromKStream() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", with).map((str, str2) -> {
            return new KeyValue(Integer.valueOf(str.charAt(0) - 'A'), str2);
        }).toTable(Materialized.with(Serdes.Integer(), (Serde) null)).toStream().to("output");
        Topology build = streamsBuilder.build();
        MatcherAssert.assertThat(build.describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000005\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KSTREAM-MAP-0000000001\n    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-TOTABLE-0000000002-repartition)\n      <-- KSTREAM-FILTER-0000000005\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-TOTABLE-0000000002-repartition])\n      --> KSTREAM-TOTABLE-0000000002\n    Processor: KSTREAM-TOTABLE-0000000002 (stores: [])\n      --> KTABLE-TOSTREAM-0000000007\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])\n      --> KSTREAM-SINK-0000000008\n      <-- KSTREAM-TOTABLE-0000000002\n    Sink: KSTREAM-SINK-0000000008 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000007\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", Serdes.Integer().deserializer(), Serdes.String().deserializer());
                createInputTopic.pipeInput("A", "01", 5L);
                createInputTopic.pipeInput("B", "02", 100L);
                createInputTopic.pipeInput("C", "03", 0L);
                createInputTopic.pipeInput("D", "04", 0L);
                createInputTopic.pipeInput("A", "05", 10L);
                createInputTopic.pipeInput("A", "06", 8L);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new TestRecord(0, "01", Instant.ofEpochMilli(5L)));
                arrayList.add(new TestRecord(1, "02", Instant.ofEpochMilli(100L)));
                arrayList.add(new TestRecord(2, "03", Instant.ofEpochMilli(0L)));
                arrayList.add(new TestRecord(3, "04", Instant.ofEpochMilli(0L)));
                arrayList.add(new TestRecord(0, "05", Instant.ofEpochMilli(10L)));
                arrayList.add(new TestRecord(0, "06", Instant.ofEpochMilli(8L)));
                Assert.assertEquals(createOutputTopic.readRecordsToList(), arrayList);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldSupportForeignKeyTableTableJoinWithKTableFromKStream() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input1", with).toTable().join(streamsBuilder.stream("input2", with).toTable(), str -> {
            return str.split("\\|")[1];
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }).toStream().to("output");
        Topology build = streamsBuilder.build(this.props);
        MatcherAssert.assertThat(build.describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KTABLE-SOURCE-0000000016 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic])\n      --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input1])\n      --> KSTREAM-TOTABLE-0000000001\n    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n      --> KTABLE-FK-JOIN-OUTPUT-0000000018\n      <-- KTABLE-SOURCE-0000000016\n    Processor: KSTREAM-TOTABLE-0000000001 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n      --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n      --> KTABLE-TOSTREAM-0000000020\n      <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [])\n      --> KTABLE-SINK-0000000008\n      <-- KSTREAM-TOTABLE-0000000001\n    Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- KTABLE-FK-JOIN-OUTPUT-0000000018\n    Sink: KSTREAM-SINK-0000000021 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000020\n    Sink: KTABLE-SINK-0000000008 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic)\n      <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000003 (topics: [input2])\n      --> KSTREAM-TOTABLE-0000000004\n    Source: KTABLE-SOURCE-0000000009 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic])\n      --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n    Processor: KSTREAM-TOTABLE-0000000004 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n      --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013\n      <-- KSTREAM-SOURCE-0000000003\n    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n      --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012\n      <-- KTABLE-SOURCE-0000000009\n    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n      --> KTABLE-SINK-0000000015\n      <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n    Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n      --> KTABLE-SINK-0000000015\n      <-- KSTREAM-TOTABLE-0000000004\n    Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n      <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012, KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("input2", new StringSerializer(), new StringSerializer());
            TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
            createInputTopic2.pipeInput("rhs1", "rhsValue1");
            createInputTopic2.pipeInput("rhs2", "rhsValue2");
            createInputTopic2.pipeInput("rhs3", "rhsValue3");
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
            createInputTopic.pipeInput("lhs1", "lhsValue1|rhs1");
            createInputTopic.pipeInput("lhs2", "lhsValue2|rhs2");
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")})));
            createInputTopic.pipeInput("lhs3", "lhsValue3|rhs1");
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
            createInputTopic.pipeInput("lhs1", (String) null);
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", (Object) null)})));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSupportTableTableJoinWithKStreamToKTable() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("left", with).toTable().join(streamsBuilder.stream("right", with).toTable(), MockValueJoiner.TOSTRING_JOINER).toStream().to("output");
        Topology build = streamsBuilder.build(this.props);
        MatcherAssert.assertThat(build.describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [left])\n      --> KSTREAM-TOTABLE-0000000001\n    Source: KSTREAM-SOURCE-0000000003 (topics: [right])\n      --> KSTREAM-TOTABLE-0000000004\n    Processor: KSTREAM-TOTABLE-0000000001 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n      --> KTABLE-JOINTHIS-0000000007\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-TOTABLE-0000000004 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n      --> KTABLE-JOINOTHER-0000000008\n      <-- KSTREAM-SOURCE-0000000003\n    Processor: KTABLE-JOINOTHER-0000000008 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n      --> KTABLE-MERGE-0000000006\n      <-- KSTREAM-TOTABLE-0000000004\n    Processor: KTABLE-JOINTHIS-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n      --> KTABLE-MERGE-0000000006\n      <-- KSTREAM-TOTABLE-0000000001\n    Processor: KTABLE-MERGE-0000000006 (stores: [])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000010\n      <-- KTABLE-MERGE-0000000006\n    Sink: KSTREAM-SINK-0000000010 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000009\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("left", new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("right", new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
                createInputTopic2.pipeInput("lhs1", "rhsValue1");
                createInputTopic2.pipeInput("rhs2", "rhsValue2");
                createInputTopic2.pipeInput("lhs3", "rhsValue3");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                createInputTopic.pipeInput("lhs1", "lhsValue1");
                createInputTopic.pipeInput("lhs2", "lhsValue2");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "lhsValue1+rhsValue1")})));
                createInputTopic.pipeInput("lhs3", "lhsValue3");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs3", "lhsValue3+rhsValue3")})));
                createInputTopic.pipeInput("lhs1", "lhsValue4");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "lhsValue4+rhsValue1")})));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldSupportStreamTableJoinWithKStreamToKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        streamsBuilder.stream("streamTopic", with).join(streamsBuilder.stream("tableTopic", with).toTable(), MockValueJoiner.TOSTRING_JOINER).to("output");
        Topology build = streamsBuilder.build(this.props);
        MatcherAssert.assertThat(build.describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [streamTopic])\n      --> KSTREAM-JOIN-0000000004\n    Processor: KSTREAM-JOIN-0000000004 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000003])\n      --> KSTREAM-SINK-0000000005\n      <-- KSTREAM-SOURCE-0000000000\n    Source: KSTREAM-SOURCE-0000000001 (topics: [tableTopic])\n      --> KSTREAM-TOTABLE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: output)\n      <-- KSTREAM-JOIN-0000000004\n    Processor: KSTREAM-TOTABLE-0000000002 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000003])\n      --> none\n      <-- KSTREAM-SOURCE-0000000001\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("streamTopic", new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("tableTopic", new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
                createInputTopic2.pipeInput("lhs1", "rhsValue1");
                createInputTopic2.pipeInput("rhs2", "rhsValue2");
                createInputTopic2.pipeInput("lhs3", "rhsValue3");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                createInputTopic.pipeInput("lhs1", "lhsValue1");
                createInputTopic.pipeInput("lhs2", "lhsValue2");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "lhsValue1+rhsValue1")})));
                createInputTopic.pipeInput("lhs3", "lhsValue3");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs3", "lhsValue3+rhsValue3")})));
                createInputTopic.pipeInput("lhs1", "lhsValue4");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "lhsValue4+rhsValue1")})));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldSupportGroupByCountWithKStreamToKTable() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", with).toTable().groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String())).count().toStream().to("output");
        MatcherAssert.assertThat(streamsBuilder.build(this.props).describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-TOTABLE-0000000001\n    Processor: KSTREAM-TOTABLE-0000000001 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KSTREAM-TOTABLE-0000000001\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> KTABLE-TOSTREAM-0000000008\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000009\n      <-- KTABLE-AGGREGATE-0000000007\n    Sink: KSTREAM-SINK-0000000009 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000008\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", Serdes.String().deserializer(), Serdes.Long().deserializer());
                createInputTopic.pipeInput("A", "green", 10L);
                createInputTopic.pipeInput("B", "green", 9L);
                createInputTopic.pipeInput("A", "blue", 12L);
                createInputTopic.pipeInput("C", "yellow", 15L);
                createInputTopic.pipeInput("D", "green", 11L);
                Assert.assertEquals(Arrays.asList(new TestRecord("green", 1L, Instant.ofEpochMilli(10L)), new TestRecord("green", 2L, Instant.ofEpochMilli(10L)), new TestRecord("green", 1L, Instant.ofEpochMilli(12L)), new TestRecord("blue", 1L, Instant.ofEpochMilli(12L)), new TestRecord("yellow", 1L, Instant.ofEpochMilli(15L)), new TestRecord("green", 2L, Instant.ofEpochMilli(12L))), createOutputTopic.readRecordsToList());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldSupportTriggerMaterializedWithKTableFromKStream() {
        Consumed with = Consumed.with(Serdes.String(), Serdes.String());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", with).toTable().mapValues(str -> {
            return Integer.valueOf(str.charAt(0) - 'a');
        }, Materialized.as("store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream().to("output");
        Topology build = streamsBuilder.build(this.props);
        MatcherAssert.assertThat(build.describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-TOTABLE-0000000001\n    Processor: KSTREAM-TOTABLE-0000000001 (stores: [])\n      --> KTABLE-MAPVALUES-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KTABLE-MAPVALUES-0000000003 (stores: [store])\n      --> KTABLE-TOSTREAM-0000000004\n      <-- KSTREAM-TOTABLE-0000000001\n    Processor: KTABLE-TOSTREAM-0000000004 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-MAPVALUES-0000000003\n    Sink: KSTREAM-SINK-0000000005 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000004\n\n"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", Serdes.String().deserializer(), Serdes.Integer().deserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("A", "green", 10L);
                createInputTopic.pipeInput("B", "green", 9L);
                createInputTopic.pipeInput("A", "blue", 12L);
                createInputTopic.pipeInput("C", "yellow", 15L);
                createInputTopic.pipeInput("D", "green", 11L);
                HashMap hashMap = new HashMap();
                hashMap.putIfAbsent("A", 1);
                hashMap.putIfAbsent("B", 6);
                hashMap.putIfAbsent("C", 24);
                hashMap.putIfAbsent("D", 6);
                Assert.assertEquals(hashMap, asMap(keyValueStore));
                Assert.assertEquals(Arrays.asList(new TestRecord("A", 6, Instant.ofEpochMilli(10L)), new TestRecord("B", 6, Instant.ofEpochMilli(9L)), new TestRecord("A", 1, Instant.ofEpochMilli(12L)), new TestRecord("C", 24, Instant.ofEpochMilli(15L)), new TestRecord("D", 6, Instant.ofEpochMilli(11L))), createOutputTopic.readRecordsToList());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private static <K, V> Map<K, V> asMap(KeyValueStore<K, V> keyValueStore) {
        HashMap hashMap = new HashMap();
        keyValueStore.all().forEachRemaining(keyValue -> {
            hashMap.put(keyValue.key, keyValue.value);
        });
        return hashMap;
    }
}
