/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private final StreamsBuilder builder = new StreamsBuilder();
    private KGroupedStream<String, String> groupedStream;
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Before
    public void before() {
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerOnReduce() {
        this.groupedStream.reduce(null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).reduce(null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnAggregate() {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store"));
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    private void doAggregateSessionWindows(Map<Windowed<String>, Integer> results) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 10L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"2", 15L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 30L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 70L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 90L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 100L));
        }
        Assert.assertEquals((Object)2, (Object)results.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)1, (Object)results.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)3, (Object)results.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldAggregateSessionWindows() {
        final HashMap<Windowed<String>, Integer> results = new HashMap<Windowed<String>, Integer>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return 0;
            }
        }, (Aggregator)new Aggregator<String, String, Integer>(){

            public Integer apply(String aggKey, String value, Integer aggregate) {
                return aggregate + 1;
            }
        }, (Merger)new Merger<String, Integer>(){

            public Integer apply(String aggKey, Integer aggOne, Integer aggTwo) {
                return aggOne + aggTwo;
            }
        }, Materialized.as((String)"session-store").withValueSerde(Serdes.Integer()));
        table.toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, Integer>(){

            public void apply(Windowed<String> key, Integer value) {
                results.put(key, value);
            }
        });
        this.doAggregateSessionWindows(results);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() {
        final HashMap<Windowed<String>, Integer> results = new HashMap<Windowed<String>, Integer>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return 0;
            }
        }, (Aggregator)new Aggregator<String, String, Integer>(){

            public Integer apply(String aggKey, String value, Integer aggregate) {
                return aggregate + 1;
            }
        }, (Merger)new Merger<String, Integer>(){

            public Integer apply(String aggKey, Integer aggOne, Integer aggTwo) {
                return aggOne + aggTwo;
            }
        }, Materialized.with(null, (Serde)Serdes.Integer()));
        table.toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, Integer>(){

            public void apply(Windowed<String> key, Integer value) {
                results.put(key, value);
            }
        });
        this.doAggregateSessionWindows(results);
    }

    private void doCountSessionWindows(Map<Windowed<String>, Long> results) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 10L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"2", 15L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 30L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 70L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 90L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"1", 100L));
        }
        Assert.assertEquals((Object)2L, (Object)results.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)1L, (Object)results.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)3L, (Object)results.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldCountSessionWindows() {
        final HashMap<Windowed<String>, Long> results = new HashMap<Windowed<String>, Long>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).count(Materialized.as((String)"session-store"));
        table.toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
            }
        });
        this.doCountSessionWindows(results);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() {
        final HashMap<Windowed<String>, Long> results = new HashMap<Windowed<String>, Long>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).count();
        table.toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
            }
        });
        this.doCountSessionWindows(results);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    private void doReduceSessionWindows(Map<Windowed<String>, String> results) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"A", 10L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"Z", 15L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"B", 30L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"A", 70L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"B", 90L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"C", 100L));
        }
        Assert.assertEquals((Object)"A:B", (Object)results.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)"Z", (Object)results.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)"A:B:C", (Object)results.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldReduceSessionWindows() {
        final HashMap<Windowed<String>, String> results = new HashMap<Windowed<String>, String>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce((Reducer)new Reducer<String>(){

            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        }, Materialized.as((String)"session-store"));
        table.toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, String>(){

            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.doReduceSessionWindows(results);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() {
        final HashMap<Windowed<String>, String> results = new HashMap<Windowed<String>, String>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce((Reducer)new Reducer<String>(){

            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        });
        table.toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, String>(){

            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.doReduceSessionWindows(results);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce(null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
        this.groupedStream.windowedBy((SessionWindows)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).reduce(null, Materialized.as(null));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(null, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, Materialized.as((String)"storeName"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, null, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, Materialized.as((String)"storeName"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as((String)"storeName"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy((SessionWindows)null);
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
        this.groupedStream.count((Materialized)null);
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        this.groupedStream.count(Materialized.as((String)"count").withKeySerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore count = driver.getKeyValueStore("count");
            MatcherAssert.assertThat((Object)count.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)3L));
            MatcherAssert.assertThat((Object)count.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)1L));
            MatcherAssert.assertThat((Object)count.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)2L));
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregate() {
        this.groupedStream.count(Materialized.as((String)"count").withKeySerde(Serdes.String()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            LogCaptureAppender.unregister(appender);
            Map metrics = driver.metrics();
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
            Assert.assertNotEquals((Object)0.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
        }
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore reduced = driver.getKeyValueStore("reduce");
            MatcherAssert.assertThat((Object)reduced.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"A+C+D"));
            MatcherAssert.assertThat((Object)reduced.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"B"));
            MatcherAssert.assertThat((Object)reduced.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)"E+F"));
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            LogCaptureAppender.unregister(appender);
            Map metrics = driver.metrics();
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
            Assert.assertNotEquals((Object)0.0, (Object)StreamsTestUtils.getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
        }
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"aggregate").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore aggregate = driver.getKeyValueStore("aggregate");
            MatcherAssert.assertThat((Object)aggregate.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"0+A+C+D"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"0+B"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)"0+E+F"));
        }
    }

    @Test
    public void shouldAggregateWithDefaultSerdes() {
        final HashMap results = new HashMap();
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().foreach((ForeachAction)new ForeachAction<String, String>(){

            public void apply(String key, String value) {
                results.put(key, value);
            }
        });
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            MatcherAssert.assertThat(results.get("1"), (Matcher)CoreMatchers.equalTo((Object)"0+A+C+D"));
            MatcherAssert.assertThat(results.get("2"), (Matcher)CoreMatchers.equalTo((Object)"0+B"));
            MatcherAssert.assertThat(results.get("3"), (Matcher)CoreMatchers.equalTo((Object)"0+E+F"));
        }
    }

    private void processData(TopologyTestDriver driver) {
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"A"));
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"B"));
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"C"));
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"D"));
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"3", (Object)"E"));
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"3", (Object)"F"));
        driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"3", (Object)null));
    }

    private void doCountWindowed(List<KeyValue<Windowed<String>, Long>> results) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"A", 0L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"B", 0L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"3", (Object)"C", 0L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"A", 500L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"1", (Object)"A", 500L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"B", 500L));
            driver.pipeInput(this.recordFactory.create(TOPIC, (Object)"2", (Object)"B", 500L));
        }
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"3", (Window)new TimeWindow(0L, 500L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)2L))));
    }

    @Test
    public void shouldCountWindowed() {
        final ArrayList<KeyValue<Windowed<String>, Long>> results = new ArrayList<KeyValue<Windowed<String>, Long>>();
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count(Materialized.as((String)"aggregate-by-key-windowed")).toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long value) {
                results.add(KeyValue.pair(key, (Object)value));
            }
        });
        this.doCountWindowed(results);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() {
        final ArrayList<KeyValue<Windowed<String>, Long>> results = new ArrayList<KeyValue<Windowed<String>, Long>>();
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count().toStream().foreach((ForeachAction)new ForeachAction<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long value) {
                results.add(KeyValue.pair(key, (Object)value));
            }
        });
        this.doCountWindowed(results);
    }
}

