/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TimeWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private TimeWindowedKStream<String, String> windowedStream;

    @Before
    public void before() {
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.windowedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L)));
    }

    @Test
    public void shouldCountWindowed() {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        this.windowedStream.count().toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
        }
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)2L, (long)15L)));
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)2L, (long)550L)));
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)1L, (long)500L)));
    }

    @Test
    public void shouldReduceWindowed() {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        this.windowedStream.reduce(MockReducer.STRING_ADDER).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
        }
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"1+2", (long)15L)));
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"10+20", (long)550L)));
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"3", (long)500L)));
    }

    @Test
    public void shouldAggregateWindowed() {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
        }
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+1+2", (long)15L)));
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+10+20", (long)550L)));
        MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+3", (long)500L)));
    }

    @Test
    public void shouldMaterializeCount() {
        this.windowedStream.count(Materialized.as((String)"count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            WindowStore windowStore = driver.getWindowStore("count-store");
            List data = StreamsTestUtils.toList(windowStore.fetch((Object)"1", (Object)"2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)2L))));
            windowStore = driver.getTimestampedWindowStore("count-store");
            data = StreamsTestUtils.toList(windowStore.fetch((Object)"1", (Object)"2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)ValueAndTimestamp.make((Object)2L, (long)15L)), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)500L)), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)550L)))));
        }
    }

    @Test
    public void shouldMaterializeReduced() {
        this.windowedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduced").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            WindowStore windowStore = driver.getWindowStore("reduced");
            List data = StreamsTestUtils.toList(windowStore.fetch((Object)"1", (Object)"2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)"1+2"), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)"3"), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)"10+20"))));
            windowStore = driver.getTimestampedWindowStore("reduced");
            data = StreamsTestUtils.toList(windowStore.fetch((Object)"1", (Object)"2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)ValueAndTimestamp.make((Object)"1+2", (long)15L)), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)ValueAndTimestamp.make((Object)"3", (long)500L)), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)ValueAndTimestamp.make((Object)"10+20", (long)550L)))));
        }
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"aggregated").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            WindowStore windowStore = driver.getWindowStore("aggregated");
            List data = StreamsTestUtils.toList(windowStore.fetch((Object)"1", (Object)"2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)"0+1+2"), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)"0+3"), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)"0+10+20"))));
            windowStore = driver.getTimestampedWindowStore("aggregated");
            data = StreamsTestUtils.toList(windowStore.fetch((Object)"1", (Object)"2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), (Object)ValueAndTimestamp.make((Object)"0+1+2", (long)15L)), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), (Object)ValueAndTimestamp.make((Object)"0+3", (long)500L)), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), (Object)ValueAndTimestamp.make((Object)"0+10+20", (long)550L)))));
        }
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER));
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.aggregate(MockInitializer.STRING_INIT, null));
    }

    @Test
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.reduce(null));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized)null));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.reduce(null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.reduce(MockReducer.STRING_ADDER, (Materialized)null));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.reduce(MockReducer.STRING_ADDER, (Named)null));
    }

    @Test
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedStream.count((Materialized)null));
    }

    private void processData(TopologyTestDriver driver) {
        TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"1", (Object)"1", 10L);
        inputTopic.pipeInput((Object)"1", (Object)"2", 15L);
        inputTopic.pipeInput((Object)"1", (Object)"3", 500L);
        inputTopic.pipeInput((Object)"2", (Object)"10", 550L);
        inputTopic.pipeInput((Object)"2", (Object)"20", 500L);
    }
}

