/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.HashMap;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KGroupedTableImplTest {
    private KGroupedTable<String, String> groupedTable;

    @Before
    public void before() {
        KStreamBuilder builder = new KStreamBuilder();
        this.groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah").groupBy(MockKeyValueMapper.SelectValueKeyValueMapper());
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
        this.groupedTable.aggregate(null, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullAdderOnAggregate() throws Exception {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.STRING_REMOVER, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSubtractorOnAggregate() throws Exception {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullAdderOnReduce() throws Exception {
        this.groupedTable.reduce(null, MockReducer.STRING_REMOVER, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSubtractorOnReduce() throws Exception {
        this.groupedTable.reduce(MockReducer.STRING_ADDER, null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
        this.groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, null);
    }

    @Test
    public void shouldReduce() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic = "input";
        KTable reduced = builder.table(Serdes.String(), Serdes.Integer(), "input", "store").groupBy(MockKeyValueMapper.NoOpKeyValueMapper()).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
        final HashMap results = new HashMap();
        reduced.foreach((ForeachAction)new ForeachAction<String, Integer>(){

            public void apply(String key, Integer value) {
                results.put(key, value);
            }
        });
        KStreamTestDriver driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
        driver.setTime(10L);
        driver.process("input", "A", 1);
        driver.process("input", "B", 2);
        driver.flushState();
        Assert.assertEquals((Object)1, results.get("A"));
        Assert.assertEquals((Object)2, results.get("B"));
        driver.process("input", "A", 2);
        driver.process("input", "B", 1);
        driver.process("input", "A", 5);
        driver.process("input", "B", 6);
        driver.flushState();
        Assert.assertEquals((Object)5, results.get("A"));
        Assert.assertEquals((Object)6, results.get("B"));
    }
}

