/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CogroupedKStreamImplTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private static final String TOPIC = "topic";
    private static final String OUTPUT = "output";
    private KGroupedStream<String, String> groupedStream;
    private CogroupedKStream<String, String> cogroupedStream;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private static final Aggregator<String, String, String> STRING_AGGREGATOR = (key, value, aggregate) -> aggregate + value;
    private static final Initializer<String> STRING_INITIALIZER = () -> "";
    private static final Aggregator<String, String, Integer> STRING_SUM_AGGREGATOR = (key, value, aggregate) -> aggregate + Integer.parseInt(value);
    private static final Aggregator<? super String, ? super Integer, Integer> SUM_AGGREGATOR = (key, value, aggregate) -> aggregate + value;
    private static final Initializer<Integer> SUM_INITIALIZER = () -> 0;

    @Before
    public void setup() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.cogroupedStream = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER);
    }

    @Test
    public void shouldThrowNPEInCogroupIfKGroupedStreamIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER));
    }

    @Test
    public void shouldNotHaveNullAggregatorOnCogroup() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.cogroup(this.groupedStream, null));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(null));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregateWitNamed() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(null, Named.as((String)"name")));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregateWitMaterialized() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregateWitNamedAndMaterialized() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(null, Named.as((String)"name"), Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(STRING_INITIALIZER, (Named)null));
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized)null));
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregateWithMateriazlied() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(STRING_INITIALIZER, null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregateWithNames() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.aggregate(STRING_INITIALIZER, Named.as((String)"name"), null));
    }

    @Test
    public void shouldNotHaveNullWindowOnWindowedByTime() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.windowedBy((Windows)null));
    }

    @Test
    public void shouldNotHaveNullWindowOnWindowedBySession() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.windowedBy((SessionWindows)null));
    }

    @Test
    public void shouldNotHaveNullWindowOnWindowedBySliding() {
        Assert.assertThrows(NullPointerException.class, () -> this.cogroupedStream.windowedBy((SlidingWindows)null));
    }

    @Test
    public void shouldNameProcessorsAndStoreBasedOnNamedParameter() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream test2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.groupByKey();
        KGroupedStream groupedTwo = test2.groupByKey();
        KTable customers = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER, Named.as((String)"test"), Materialized.as((String)"store"));
        customers.toStream().to(OUTPUT);
        String topologyDescription = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> test-cogroup-agg-0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> test-cogroup-agg-1\n    Processor: test-cogroup-agg-0 (stores: [store])\n      --> test-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: test-cogroup-agg-1 (stores: [store])\n      --> test-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: test-cogroup-merge (stores: [])\n      --> KTABLE-TOSTREAM-0000000005\n      <-- test-cogroup-agg-0, test-cogroup-agg-1\n    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000006\n      <-- test-cogroup-merge\n    Sink: KSTREAM-SINK-0000000006 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000005\n\n"));
    }

    @Test
    public void shouldNameRepartitionTopic() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream test2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey(Grouped.as((String)"repartition-test"));
        KGroupedStream groupedTwo = test2.groupByKey();
        KTable customers = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        customers.toStream().to(OUTPUT);
        String topologyDescription = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> repartition-test-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: repartition-test-repartition-filter (stores: [])\n      --> repartition-test-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: repartition-test-repartition-sink (topic: repartition-test-repartition)\n      <-- repartition-test-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Source: repartition-test-repartition-source (topics: [repartition-test-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- repartition-test-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> KTABLE-TOSTREAM-0000000010\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n      --> KSTREAM-SINK-0000000011\n      <-- COGROUPKSTREAM-MERGE-0000000009\n    Sink: KSTREAM-SINK-0000000011 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000010\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModification() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream test2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = test2.groupByKey();
        KTable customers = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER, Named.as((String)"test"), Materialized.as((String)"store"));
        customers.toStream().to(OUTPUT);
        String topologyDescription = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> store-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: store-repartition-filter (stores: [])\n      --> store-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: store-repartition-sink (topic: store-repartition)\n      <-- store-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> test-cogroup-agg-1\n    Source: store-repartition-source (topics: [store-repartition])\n      --> test-cogroup-agg-0\n    Processor: test-cogroup-agg-0 (stores: [store])\n      --> test-cogroup-merge\n      <-- store-repartition-source\n    Processor: test-cogroup-agg-1 (stores: [store])\n      --> test-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: test-cogroup-merge (stores: [])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- test-cogroup-agg-0, test-cogroup-agg-1\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000010\n      <-- test-cogroup-merge\n    Sink: KSTREAM-SINK-0000000010 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000009\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroups() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        KTable cogroupedTwo = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        KTable cogroupedOne = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        cogroupedOne.toStream().to(OUTPUT);
        cogroupedTwo.toStream().to("OUTPUT2");
        String topologyDescription = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-filter, COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000015\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000014\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000014 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> KTABLE-TOSTREAM-0000000019\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-MERGE-0000000016 (stores: [])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-0000000014, COGROUPKSTREAM-AGGREGATE-0000000015\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-SINK-0000000018\n      <-- COGROUPKSTREAM-MERGE-0000000016\n    Processor: KTABLE-TOSTREAM-0000000019 (stores: [])\n      --> KSTREAM-SINK-0000000020\n      <-- COGROUPKSTREAM-MERGE-0000000009\n    Sink: KSTREAM-SINK-0000000018 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000017\n    Sink: KSTREAM-SINK-0000000020 (topic: OUTPUT2)\n      <-- KTABLE-TOSTREAM-0000000019\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() {
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        KTable cogroupedTwo = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        KTable cogroupedOne = groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        cogroupedOne.toStream().to(OUTPUT);
        cogroupedTwo.toStream().to("OUTPUT2");
        String topologyDescription = builder.build(properties).describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000014, COGROUPKSTREAM-AGGREGATE-0000000007\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000015, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000014 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> KTABLE-TOSTREAM-0000000019\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-MERGE-0000000016 (stores: [])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-0000000014, COGROUPKSTREAM-AGGREGATE-0000000015\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-SINK-0000000018\n      <-- COGROUPKSTREAM-MERGE-0000000016\n    Processor: KTABLE-TOSTREAM-0000000019 (stores: [])\n      --> KSTREAM-SINK-0000000020\n      <-- COGROUPKSTREAM-MERGE-0000000009\n    Sink: KSTREAM-SINK-0000000018 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000017\n    Sink: KSTREAM-SINK-0000000020 (topic: OUTPUT2)\n      <-- KTABLE-TOSTREAM-0000000019\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInDifferentCogroups() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KStream stream3 = builder.stream("three", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        KGroupedStream groupedThree = stream3.groupByKey();
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedThree, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        String topologyDescription = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter, COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000015\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000016\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000016 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000017 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000015, COGROUPKSTREAM-AGGREGATE-0000000016\n\n  Sub-topology: 2\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Source: KSTREAM-SOURCE-0000000002 (topics: [three])\n      --> COGROUPKSTREAM-AGGREGATE-0000000009\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000009 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: COGROUPKSTREAM-MERGE-0000000010 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000009\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInDifferentCogroupsWithOptimization() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KStream stream3 = builder.stream("three", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        KGroupedStream groupedThree = stream3.groupByKey();
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedThree, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        String topologyDescription = builder.build(properties).describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000015\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000016\n    Source: KSTREAM-SOURCE-0000000002 (topics: [three])\n      --> COGROUPKSTREAM-AGGREGATE-0000000009\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000009 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000016 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000010 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000009\n    Processor: COGROUPKSTREAM-MERGE-0000000017 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000015, COGROUPKSTREAM-AGGREGATE-0000000016\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReused() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupedOne.aggregate(STRING_INITIALIZER, STRING_AGGREGATOR);
        String topologyDescription = builder.build().describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter, KSTREAM-FILTER-0000000013\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Processor: KSTREAM-FILTER-0000000013 (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n    Sink: KSTREAM-SINK-0000000012 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition)\n      <-- KSTREAM-FILTER-0000000013\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000014 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition])\n      --> KSTREAM-AGGREGATE-0000000011\n    Processor: KSTREAM-AGGREGATE-0000000011 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> none\n      <-- KSTREAM-SOURCE-0000000014\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedWithOptimization() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupedOne.aggregate(STRING_INITIALIZER, STRING_AGGREGATOR);
        String topologyDescription = builder.build(properties).describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000011\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: KSTREAM-AGGREGATE-0000000011 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedRemadeWithOptimization() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KStream stream3 = builder.stream("three", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        KGroupedStream groupedTwo = stream2.groupByKey();
        KGroupedStream groupedThree = stream3.groupByKey();
        KGroupedStream groupedFour = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey();
        groupedOne.cogroup(STRING_AGGREGATOR).cogroup(groupedTwo, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupedThree.cogroup(STRING_AGGREGATOR).cogroup(groupedFour, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        String topologyDescription = builder.build(properties).describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000003, KSTREAM-MAP-0000000004\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAP-0000000004 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-sink\n      <-- KSTREAM-MAP-0000000004\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-filter\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000009\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000010\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000009 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> COGROUPKSTREAM-MERGE-0000000011\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000010 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> COGROUPKSTREAM-MERGE-0000000011\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000011 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000009, COGROUPKSTREAM-AGGREGATE-0000000010\n\n  Sub-topology: 2\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000017\n    Source: KSTREAM-SOURCE-0000000002 (topics: [three])\n      --> COGROUPKSTREAM-AGGREGATE-0000000016\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000016 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> COGROUPKSTREAM-MERGE-0000000018\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000017 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> COGROUPKSTREAM-MERGE-0000000018\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-source\n    Processor: COGROUPKSTREAM-MERGE-0000000018 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000016, COGROUPKSTREAM-AGGREGATE-0000000017\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForCogroupsUsedTwice() {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KGroupedStream groupedOne = stream1.map((k, v) -> new KeyValue(v, k)).groupByKey(Grouped.as((String)"foo"));
        CogroupedKStream one = groupedOne.cogroup(STRING_AGGREGATOR);
        one.aggregate(STRING_INITIALIZER);
        one.aggregate(STRING_INITIALIZER);
        String topologyDescription = builder.build(properties).describe().toString();
        MatcherAssert.assertThat((Object)topologyDescription, (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> foo-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: foo-repartition-filter (stores: [])\n      --> foo-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Sink: foo-repartition-sink (topic: foo-repartition)\n      <-- foo-repartition-filter\n\n  Sub-topology: 1\n    Source: foo-repartition-source (topics: [foo-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> COGROUPKSTREAM-MERGE-0000000007\n      <-- foo-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])\n      --> COGROUPKSTREAM-MERGE-0000000013\n      <-- foo-repartition-source\n    Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000006\n    Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000012\n\n"));
    }

    @Test
    public void shouldCogroupAndAggregateSingleKStreams() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KTable customers = grouped1.cogroup(STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 0L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "B", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BB", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AA", 0L);
        }
    }

    @Test
    public void testCogroupHandleNullValues() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KTable customers = grouped1.cogroup(STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 0L);
            testInputTopic.pipeInput((Object)"k2", null, 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 0L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "B", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BB", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AA", 0L);
        }
    }

    @Test
    public void shouldCogroupAndAggregateTwoKStreamsWithDistinctKeys() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KGroupedStream grouped2 = stream2.groupByKey();
        KTable customers = grouped1.cogroup(STRING_AGGREGATOR).cogroup(grouped2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic("two", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 10L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 200L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 1L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AA", 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AAA", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AAAA", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "B", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BB", 200L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BBB", 200L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BBBB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BBBBB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "BBBBBB", 500L);
        }
    }

    @Test
    public void shouldCogroupAndAggregateTwoKStreamsWithSharedKeys() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KGroupedStream grouped2 = stream2.groupByKey();
        KTable customers = grouped1.cogroup(STRING_AGGREGATOR).cogroup(grouped2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic("two", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 10L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 200L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 1L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k3", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "A", 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AA", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AA", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AAB", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AABB", 200L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AAB", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AABBB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AABB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AABBBB", 500L);
        }
    }

    @Test
    public void shouldAllowDifferentOutputTypeInCoGroup() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KGroupedStream grouped2 = stream2.groupByKey();
        KTable customers = grouped1.cogroup(STRING_SUM_AGGREGATOR).cogroup(grouped2, STRING_SUM_AGGREGATOR).aggregate(SUM_INITIALIZER, Materialized.as((String)"store1").withValueSerde(Serdes.Integer()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic("two", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new IntegerDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"1", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"1", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"1", 10L);
            testInputTopic.pipeInput((Object)"k2", (Object)"1", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"2", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"2", 200L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"2", 1L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"2", 500L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"2", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"3", 500L);
            testInputTopic2.pipeInput((Object)"k3", (Object)"2", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"2", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 1, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 1, 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 2, 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 2, 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 4, 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 6, 200L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 4, 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 8, 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 6, 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 11, 500L);
        }
    }

    @Test
    public void shouldCoGroupStreamsWithDifferentInputTypes() {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed integerConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer());
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", integerConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KGroupedStream grouped2 = stream2.groupByKey();
        KTable customers = grouped1.cogroup(STRING_SUM_AGGREGATOR).cogroup(grouped2, SUM_AGGREGATOR).aggregate(SUM_INITIALIZER, Materialized.as((String)"store1").withValueSerde(Serdes.Integer()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic("two", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new IntegerDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"1", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"1", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"1", 10L);
            testInputTopic.pipeInput((Object)"k2", (Object)"1", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)2, 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)2, 200L);
            testInputTopic2.pipeInput((Object)"k1", (Object)2, 1L);
            testInputTopic2.pipeInput((Object)"k2", (Object)2, 500L);
            testInputTopic2.pipeInput((Object)"k1", (Object)2, 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)3, 500L);
            testInputTopic2.pipeInput((Object)"k3", (Object)2, 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)2, 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 1, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 1, 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 2, 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 2, 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 4, 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 6, 200L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 4, 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 8, 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k1", 6, 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k2", 11, 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, Integer>)testOutputTopic, "k3", 2, 500L);
        }
    }

    @Test
    public void testCogroupKeyMixedAggregators() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KGroupedStream grouped2 = stream2.groupByKey();
        KTable customers = grouped1.cogroup(MockAggregator.TOSTRING_REMOVER).cogroup(grouped2, MockAggregator.TOSTRING_ADDER).aggregate(MockInitializer.STRING_INIT, Materialized.as((String)"store1").withValueSerde(Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic("two", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"1", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"1", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"1", 10L);
            testInputTopic.pipeInput((Object)"k2", (Object)"1", 100L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"2", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"2", 500L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"2", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"2", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "0-1", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "0-1", 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "0-1-1", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "0-1-1", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "0-1-1+2", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "0-1-1+2", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "0-1-1+2+2", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "0-1-1+2+2", 500L);
        }
    }

    @Test
    public void testCogroupWithThreeGroupedStreams() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("one", this.stringConsumed);
        KStream stream2 = builder.stream("two", this.stringConsumed);
        KStream stream3 = builder.stream("three", this.stringConsumed);
        KGroupedStream grouped1 = stream1.groupByKey();
        KGroupedStream grouped2 = stream2.groupByKey();
        KGroupedStream grouped3 = stream3.groupByKey();
        KTable customers = grouped1.cogroup(STRING_AGGREGATOR).cogroup(grouped2, STRING_AGGREGATOR).cogroup(grouped3, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("one", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic("two", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic3 = driver.createInputTopic("three", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 10L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 100L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 200L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 1L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic3.pipeInput((Object)"k1", (Object)"B", 500L);
            testInputTopic3.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic3.pipeInput((Object)"k3", (Object)"B", 500L);
            testInputTopic3.pipeInput((Object)"k2", (Object)"B", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "A", 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AA", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AA", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AAB", 100L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AABB", 200L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AAB", 10L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AABBB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k1", "AABB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k2", "AABBBB", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<String, String>)testOutputTopic, "k3", "B", 500L);
        }
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<String, String> outputTopic, String expectedKey, String expectedValue, long expectedTimestamp) {
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)expectedKey, (Object)expectedValue, null, Long.valueOf(expectedTimestamp))));
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<String, Integer> outputTopic, String expectedKey, Integer expectedValue, long expectedTimestamp) {
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)expectedKey, (Object)expectedValue, null, Long.valueOf(expectedTimestamp))));
    }
}

