/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KTableAggregateTest {
    private final Serde<String> stringSerde = Serdes.String();
    private KStreamTestDriver driver = null;
    private File stateDir = null;

    @After
    public void tearDown() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Before
    public void setUp() throws IOException {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
    }

    @Test
    public void testAggBasic() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        KTable table1 = builder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTable table2 = table1.groupBy(MockKeyValueMapper.NoOpKeyValueMapper(), this.stringSerde, this.stringSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, this.stringSerde, "topic1-Canonized");
        table2.toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "A", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "C", "5");
        this.driver.flushState();
        this.driver.process("topic1", "D", "6");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        this.driver.process("topic1", "C", "8");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:0+1", "B:0+2", "A:0+1-1+3", "B:0+2-2+4", "C:0+5", "D:0+6", "B:0+2-2+4-4+7", "C:0+5-5+8"}), proc.processed);
    }

    @Test
    public void testAggCoalesced() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        KTable table1 = builder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTable table2 = table1.groupBy(MockKeyValueMapper.NoOpKeyValueMapper(), this.stringSerde, this.stringSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, this.stringSerde, "topic1-Canonized");
        table2.toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.process("topic1", "A", "3");
        this.driver.process("topic1", "A", "4");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:0+4"}), proc.processed);
    }

    @Test
    public void testAggRepartition() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        KTable table1 = builder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTable table2 = table1.groupBy((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<String, String>>(){

            public KeyValue<String, String> apply(String key, String value) {
                if (key.equals("null")) {
                    return KeyValue.pair(null, (Object)value);
                }
                if (key.equals("NULL")) {
                    return null;
                }
                return KeyValue.pair((Object)value, (Object)value);
            }
        }, this.stringSerde, this.stringSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, this.stringSerde, "topic1-Canonized");
        table2.toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "null", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "NULL", "5");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"1:0+1", "1:0+1-1", "1:0+1-1+1", "2:0+2", "2:0+2-2", "4:0+4", "4:0+4-4", "7:0+7"}), proc.processed);
    }

    @Test
    public void testCount() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy(MockKeyValueMapper.SelectValueKeyValueMapper(), this.stringSerde, this.stringSerde).count("count").toStream().process(proc, new String[0]);
        KStreamTestDriver driver = new KStreamTestDriver(builder, this.stateDir);
        driver.process("count-test-input", "A", "green");
        driver.flushState();
        driver.process("count-test-input", "B", "green");
        driver.flushState();
        driver.process("count-test-input", "A", "blue");
        driver.flushState();
        driver.process("count-test-input", "C", "yellow");
        driver.flushState();
        driver.process("count-test-input", "D", "green");
        driver.flushState();
        driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"green:1", "green:2", "green:1", "blue:1", "yellow:1", "green:2"}), proc.processed);
    }

    @Test
    public void testCountCoalesced() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy(MockKeyValueMapper.SelectValueKeyValueMapper(), this.stringSerde, this.stringSerde).count("count").toStream().process(proc, new String[0]);
        KStreamTestDriver driver = new KStreamTestDriver(builder, this.stateDir);
        driver.process("count-test-input", "A", "green");
        driver.process("count-test-input", "B", "green");
        driver.process("count-test-input", "A", "blue");
        driver.process("count-test-input", "C", "yellow");
        driver.process("count-test-input", "D", "green");
        driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"blue:1", "yellow:1", "green:2"}), proc.processed);
    }

    @Test
    public void testRemoveOldBeforeAddNew() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<String, String>>(){

            public KeyValue<String, String> apply(String key, String value) {
                return KeyValue.pair((Object)String.valueOf(key.charAt(0)), (Object)String.valueOf(key.charAt(1)));
            }
        }, this.stringSerde, this.stringSerde).aggregate((Initializer)new Initializer<String>(){

            public String apply() {
                return "";
            }
        }, (Aggregator)new Aggregator<String, String, String>(){

            public String apply(String aggKey, String value, String aggregate) {
                return aggregate + value;
            }
        }, (Aggregator)new Aggregator<String, String, String>(){

            public String apply(String key, String value, String aggregate) {
                return aggregate.replaceAll(value, "");
            }
        }, Serdes.String(), "someStore").toStream().process(proc, new String[0]);
        KStreamTestDriver driver = new KStreamTestDriver(builder, this.stateDir);
        driver.process("count-test-input", "11", "A");
        driver.flushState();
        driver.process("count-test-input", "12", "B");
        driver.flushState();
        driver.process("count-test-input", "11", null);
        driver.flushState();
        driver.process("count-test-input", "12", "C");
        driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"1:1", "1:12", "1:2", "1:2"}), proc.processed);
    }
}

