package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.class */
public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private KGroupedStream<String, String> groupedStream;
    private final KStreamBuilder builder = new KStreamBuilder();
    private KStreamTestDriver driver = null;

    @Before
    public void before() {
        this.groupedStream = this.builder.stream(Serdes.String(), Serdes.String(), new String[]{TOPIC}).groupByKey(Serdes.String(), Serdes.String());
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullReducerOnReduce() throws Exception {
        this.groupedStream.reduce((Reducer) null, "store");
    }

    @Test
    public void shouldAllowNullStoreNameOnReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnCount() throws Exception {
        this.groupedStream.count((StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnWindowedCount() throws Exception {
        this.groupedStream.count(TimeWindows.of(10L), (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
        this.groupedStream.reduce((Reducer) null, TimeWindows.of(10L), "store");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Windows) null, "store");
    }

    @Test
    public void shouldAllowNullStoreNameWithWindowedReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10L), (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10L), INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
        this.groupedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullAdderOnAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Serdes.String(), "store");
    }

    @Test
    public void shouldAllowNullStoreNameOnAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), Serdes.String(), "store");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullAdderOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, TimeWindows.of(10L), Serdes.String(), "store");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Windows) null, Serdes.String(), "store");
    }

    @Test
    public void shouldAllowNullStoreNameOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), Serdes.String(), (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), Serdes.String(), INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), (StateStoreSupplier) null);
    }

    private void doAggregateSessionWindows(Map<Windowed<String>, Integer> map) throws Exception {
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "2");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.flushState();
        Assert.assertEquals(2, map.get(new Windowed("1", new SessionWindow(10L, 30L))));
        Assert.assertEquals(1, map.get(new Windowed("2", new SessionWindow(15L, 15L))));
        Assert.assertEquals(3, map.get(new Windowed("1", new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldAggregateSessionWindows() throws Exception {
        final HashMap hashMap = new HashMap();
        KTable aggregate = this.groupedStream.aggregate(new Initializer<Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.1
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Integer m23apply() {
                return 0;
            }
        }, new Aggregator<String, String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.2
            public Integer apply(String str, String str2, Integer num) {
                return Integer.valueOf(num.intValue() + 1);
            }
        }, new Merger<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.3
            public Integer apply(String str, Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, SessionWindows.with(30L), Serdes.Integer(), "session-store");
        aggregate.foreach(new ForeachAction<Windowed<String>, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.4
            public void apply(Windowed<String> windowed, Integer num) {
                hashMap.put(windowed, num);
            }
        });
        doAggregateSessionWindows(hashMap);
        Assert.assertEquals(aggregate.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() throws Exception {
        final HashMap hashMap = new HashMap();
        KTable aggregate = this.groupedStream.aggregate(new Initializer<Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.5
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Integer m24apply() {
                return 0;
            }
        }, new Aggregator<String, String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.6
            public Integer apply(String str, String str2, Integer num) {
                return Integer.valueOf(num.intValue() + 1);
            }
        }, new Merger<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.7
            public Integer apply(String str, Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, SessionWindows.with(30L), Serdes.Integer());
        aggregate.foreach(new ForeachAction<Windowed<String>, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.8
            public void apply(Windowed<String> windowed, Integer num) {
                hashMap.put(windowed, num);
            }
        });
        doAggregateSessionWindows(hashMap);
        Assert.assertNull(aggregate.queryableStoreName());
    }

    private void doCountSessionWindows(Map<Windowed<String>, Long> map) throws Exception {
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "2");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.flushState();
        Assert.assertEquals(2L, map.get(new Windowed("1", new SessionWindow(10L, 30L))));
        Assert.assertEquals(1L, map.get(new Windowed("2", new SessionWindow(15L, 15L))));
        Assert.assertEquals(3L, map.get(new Windowed("1", new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        final HashMap hashMap = new HashMap();
        KTable count = this.groupedStream.count(SessionWindows.with(30L), "session-store");
        count.foreach(new ForeachAction<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.9
            public void apply(Windowed<String> windowed, Long l) {
                hashMap.put(windowed, l);
            }
        });
        doCountSessionWindows(hashMap);
        Assert.assertEquals(count.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
        final HashMap hashMap = new HashMap();
        KTable count = this.groupedStream.count(SessionWindows.with(30L));
        count.foreach(new ForeachAction<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.10
            public void apply(Windowed<String> windowed, Long l) {
                hashMap.put(windowed, l);
            }
        });
        doCountSessionWindows(hashMap);
        Assert.assertNull(count.queryableStoreName());
    }

    private void doReduceSessionWindows(Map<Windowed<String>, String> map) throws Exception {
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "Z");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "B");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "B");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "C");
        this.driver.flushState();
        Assert.assertEquals("A:B", map.get(new Windowed("1", new SessionWindow(10L, 30L))));
        Assert.assertEquals("Z", map.get(new Windowed("2", new SessionWindow(15L, 15L))));
        Assert.assertEquals("A:B:C", map.get(new Windowed("1", new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        final HashMap hashMap = new HashMap();
        KTable reduce = this.groupedStream.reduce(new Reducer<String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.11
            public String apply(String str, String str2) {
                return str + ":" + str2;
            }
        }, SessionWindows.with(30L), "session-store");
        reduce.foreach(new ForeachAction<Windowed<String>, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.12
            public void apply(Windowed<String> windowed, String str) {
                hashMap.put(windowed, str);
            }
        });
        doReduceSessionWindows(hashMap);
        Assert.assertEquals(reduce.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
        final HashMap hashMap = new HashMap();
        KTable reduce = this.groupedStream.reduce(new Reducer<String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.13
            public String apply(String str, String str2) {
                return str + ":" + str2;
            }
        }, SessionWindows.with(30L));
        reduce.foreach(new ForeachAction<Windowed<String>, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.14
            public void apply(Windowed<String> windowed, String str) {
                hashMap.put(windowed, str);
            }
        });
        doReduceSessionWindows(hashMap);
        Assert.assertNull(reduce.queryableStoreName());
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce((Reducer) null, SessionWindows.with(10L), "store");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows) null, "store");
    }

    @Test
    public void shouldAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10L), (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10L), INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws Exception {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10L), (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.15
            public String apply(String str, String str2, String str3) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), "storeName");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.16
            public String apply(String str, String str2, String str3) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), "storeName");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null, SessionWindows.with(10L), Serdes.String(), "storeName");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.17
            public String apply(String str, String str2, String str3) {
                return null;
            }
        }, (SessionWindows) null, Serdes.String(), "storeName");
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.18
            public String apply(String str, String str2, String str3) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.19
            public String apply(String str, String str2, String str3) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.20
            public String apply(String str, String str2, String str3) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count((SessionWindows) null, "store");
    }

    @Test
    public void shouldAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count(SessionWindows.with(90L), (String) null);
    }

    @Test(expected = InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count(SessionWindows.with(90L), INVALID_STORE_NAME);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() throws Exception {
        this.groupedStream.count(SessionWindows.with(90L), (StateStoreSupplier) null);
    }

    private void doCountWindowed(List<KeyValue<Windowed<String>, Long>> list) throws Exception {
        this.driver = new KStreamTestDriver(this.builder, TestUtils.tempDirectory(), 0L);
        this.driver.setTime(0L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "3", "C");
        this.driver.setTime(500L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "2", "B");
        MatcherAssert.assertThat(list, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), 1L), KeyValue.pair(new Windowed("2", new TimeWindow(0L, 500L)), 1L), KeyValue.pair(new Windowed("3", new TimeWindow(0L, 500L)), 1L), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), 2L), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), 2L))));
    }

    @Test
    public void shouldCountWindowed() throws Exception {
        final ArrayList arrayList = new ArrayList();
        this.groupedStream.count(TimeWindows.of(500L), "aggregate-by-key-windowed").foreach(new ForeachAction<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.21
            public void apply(Windowed<String> windowed, Long l) {
                arrayList.add(KeyValue.pair(windowed, l));
            }
        });
        doCountWindowed(arrayList);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() throws Exception {
        final ArrayList arrayList = new ArrayList();
        this.groupedStream.count(TimeWindows.of(500L)).foreach(new ForeachAction<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImplTest.22
            public void apply(Windowed<String> windowed, Long l) {
                arrayList.add(KeyValue.pair(windowed, l));
            }
        });
        doCountWindowed(arrayList);
    }
}
