/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private final KStreamBuilder builder = new KStreamBuilder();
    private KGroupedStream<String, String> groupedStream;
    private KStreamTestDriver driver = null;

    @Before
    public void before() {
        KStream stream = this.builder.stream(Serdes.String(), Serdes.String(), new String[]{TOPIC});
        this.groupedStream = stream.groupByKey(Serdes.String(), Serdes.String());
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerOnReduce() throws Exception {
        this.groupedStream.reduce(null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreNameOnReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
        this.groupedStream.reduce(null, (Windows)TimeWindows.of((long)10L), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Windows)null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Windows)TimeWindows.of((long)10L), (String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, (Windows)TimeWindows.of((long)10L), Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, (Windows)TimeWindows.of((long)10L), Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Windows)TimeWindows.of((long)10L), Serdes.String(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Windows)TimeWindows.of((long)10L), null);
    }

    @Test
    public void shouldAggregateSessionWindows() throws Exception {
        final HashMap results = new HashMap();
        this.groupedStream.aggregate((Initializer)new Initializer<Integer>(){

            public Integer apply() {
                return 0;
            }
        }, (Aggregator)new Aggregator<String, String, Integer>(){

            public Integer apply(String aggKey, String value, Integer aggregate) {
                return aggregate + 1;
            }
        }, (Merger)new Merger<String, Integer>(){

            public Integer apply(String aggKey, Integer aggOne, Integer aggTwo) {
                return aggOne + aggTwo;
            }
        }, SessionWindows.with((long)30L), Serdes.Integer(), "session-store").foreach((ForeachAction)new ForeachAction<Windowed<String>, Integer>(){

            public void apply(Windowed<String> key, Integer value) {
                results.put(key, value);
            }
        });
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "2");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.flushState();
        Assert.assertEquals((Object)2, results.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)1, results.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)3, results.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        final HashMap results = new HashMap();
        this.groupedStream.count(SessionWindows.with((long)30L), "session-store").foreach((ForeachAction)new ForeachAction<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
            }
        });
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "2");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.flushState();
        Assert.assertEquals((Object)2L, results.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)1L, results.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)3L, results.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        final HashMap results = new HashMap();
        this.groupedStream.reduce((Reducer)new Reducer<String>(){

            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        }, SessionWindows.with((long)30L), "session-store").foreach((ForeachAction)new ForeachAction<Windowed<String>, String>(){

            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "Z");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "B");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "B");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "C");
        this.driver.flushState();
        Assert.assertEquals((Object)"A:B", results.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)"Z", results.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)"A:B:C", results.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(null, SessionWindows.with((long)10L), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows)null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with((long)10L), (String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierNameWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with((long)10L), (StateStoreSupplier)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with((long)10L), Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with((long)10L), Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, SessionWindows.with((long)10L), Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, null, Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with((long)10L), Serdes.String(), (String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger)new Merger<String, String>(){

            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with((long)10L), Serdes.String(), (StateStoreSupplier)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count((SessionWindows)null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count(SessionWindows.with((long)90L), (String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count(SessionWindows.with((long)90L), (StateStoreSupplier)null);
    }

    @Test
    public void shouldCountWindowed() throws Exception {
        final ArrayList results = new ArrayList();
        this.groupedStream.count((Windows)TimeWindows.of((long)500L), "aggregate-by-key-windowed").foreach((ForeachAction)new ForeachAction<Windowed<String>, Long>(){

            public void apply(Windowed<String> key, Long value) {
                results.add(KeyValue.pair(key, (Object)value));
            }
        });
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory(), 0L);
        this.driver.setTime(0L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "3", "C");
        this.driver.setTime(500L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "2", "B");
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"3", (Window)new TimeWindow(0L, 500L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)2L))));
    }
}

