package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.class */
public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private KGroupedStream<String, String> groupedStream;
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Before
    public void before() {
        this.groupedStream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullAggregatorOnCogroup() {
        this.groupedStream.cogroup((Aggregator) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullReducerOnReduce() {
        this.groupedStream.reduce((Reducer) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotHaveInvalidStoreNameOnReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullReducerWithWindowedReduce() {
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).reduce((Reducer) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullWindowsWithWindowedReduce() {
        this.groupedStream.windowedBy((Windows) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullInitializerOnAggregate() {
        this.groupedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullAdderOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Materialized.as("store"));
    }

    @Test(expected = TopologyException.class)
    public void shouldNotHaveInvalidStoreNameOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullAdderOnWindowedAggregate() {
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
        this.groupedStream.windowedBy((Windows) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
    }

    private void doAggregateSessionWindows(MockProcessorSupplier<Windowed<String>, Integer> mockProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("1", "1", 10L);
                createInputTopic.pipeInput("2", "2", 15L);
                createInputTopic.pipeInput("1", "1", 30L);
                createInputTopic.pipeInput("1", "1", 70L);
                createInputTopic.pipeInput("1", "1", 100L);
                createInputTopic.pipeInput("1", "1", 90L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Map<Windowed<String>, ValueAndTimestamp<Integer>> map = mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey;
                Assert.assertEquals(ValueAndTimestamp.make(2, 30L), map.get(new Windowed("1", new SessionWindow(10L, 30L))));
                Assert.assertEquals(ValueAndTimestamp.make(1, 15L), map.get(new Windowed("2", new SessionWindow(15L, 15L))));
                Assert.assertEquals(ValueAndTimestamp.make(3, 100L), map.get(new Windowed("1", new SessionWindow(70L, 100L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAggregateSessionWindows() {
        MockProcessorSupplier<Windowed<String>, Integer> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable aggregate = this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).aggregate(() -> {
            return 0;
        }, (str, str2, num) -> {
            return Integer.valueOf(num.intValue() + 1);
        }, (str3, num2, num3) -> {
            return Integer.valueOf(num2.intValue() + num3.intValue());
        }, Materialized.as("session-store").withValueSerde(Serdes.Integer()));
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        doAggregateSessionWindows(mockProcessorSupplier);
        Assert.assertEquals(aggregate.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, Integer> mockProcessorSupplier = new MockProcessorSupplier<>();
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).aggregate(() -> {
            return 0;
        }, (str, str2, num) -> {
            return Integer.valueOf(num.intValue() + 1);
        }, (str3, num2, num3) -> {
            return Integer.valueOf(num2.intValue() + num3.intValue());
        }, Materialized.with((Serde) null, Serdes.Integer())).toStream().process(mockProcessorSupplier, new String[0]);
        doAggregateSessionWindows(mockProcessorSupplier);
    }

    private void doCountSessionWindows(MockProcessorSupplier<Windowed<String>, Long> mockProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("1", "1", 10L);
                createInputTopic.pipeInput("2", "2", 15L);
                createInputTopic.pipeInput("1", "1", 30L);
                createInputTopic.pipeInput("1", "1", 70L);
                createInputTopic.pipeInput("1", "1", 100L);
                createInputTopic.pipeInput("1", "1", 90L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Map<Windowed<String>, ValueAndTimestamp<Long>> map = mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey;
                Assert.assertEquals(ValueAndTimestamp.make(2L, 30L), map.get(new Windowed("1", new SessionWindow(10L, 30L))));
                Assert.assertEquals(ValueAndTimestamp.make(1L, 15L), map.get(new Windowed("2", new SessionWindow(15L, 15L))));
                Assert.assertEquals(ValueAndTimestamp.make(3L, 100L), map.get(new Windowed("1", new SessionWindow(70L, 100L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldCountSessionWindows() {
        MockProcessorSupplier<Windowed<String>, Long> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable count = this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).count(Materialized.as("session-store"));
        count.toStream().process(mockProcessorSupplier, new String[0]);
        doCountSessionWindows(mockProcessorSupplier);
        Assert.assertEquals(count.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, Long> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable count = this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).count();
        count.toStream().process(mockProcessorSupplier, new String[0]);
        doCountSessionWindows(mockProcessorSupplier);
        Assert.assertNull(count.queryableStoreName());
    }

    private void doReduceSessionWindows(MockProcessorSupplier<Windowed<String>, String> mockProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("1", "A", 10L);
                createInputTopic.pipeInput("2", "Z", 15L);
                createInputTopic.pipeInput("1", "B", 30L);
                createInputTopic.pipeInput("1", "A", 70L);
                createInputTopic.pipeInput("1", "B", 100L);
                createInputTopic.pipeInput("1", "C", 90L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Map<Windowed<String>, ValueAndTimestamp<String>> map = mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey;
                Assert.assertEquals(ValueAndTimestamp.make("A:B", 30L), map.get(new Windowed("1", new SessionWindow(10L, 30L))));
                Assert.assertEquals(ValueAndTimestamp.make("Z", 15L), map.get(new Windowed("2", new SessionWindow(15L, 15L))));
                Assert.assertEquals(ValueAndTimestamp.make("A:B:C", 100L), map.get(new Windowed("1", new SessionWindow(70L, 100L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReduceSessionWindows() {
        MockProcessorSupplier<Windowed<String>, String> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable reduce = this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).reduce((str, str2) -> {
            return str + ":" + str2;
        }, Materialized.as("session-store"));
        reduce.toStream().process(mockProcessorSupplier, new String[0]);
        doReduceSessionWindows(mockProcessorSupplier);
        Assert.assertEquals(reduce.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, String> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable reduce = this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).reduce((str, str2) -> {
            return str + ":" + str2;
        });
        reduce.toStream().process(mockProcessorSupplier, new String[0]);
        doReduceSessionWindows(mockProcessorSupplier);
        Assert.assertNull(reduce.queryableStoreName());
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).reduce((Reducer) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
        this.groupedStream.windowedBy((SessionWindows) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).reduce((Reducer) null, Materialized.as((String) null));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, (str, str2, str3) -> {
            return null;
        }, Materialized.as("storeName"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, (Aggregator) null, (str, str2, str3) -> {
            return null;
        }, Materialized.as("storeName"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null, Materialized.as("storeName"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy((SessionWindows) null);
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (str, str2, str3) -> {
            return null;
        }, Materialized.with(Serdes.String(), Serdes.String()));
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (str, str2, str3) -> {
            return null;
        }, Materialized.as(INVALID_STORE_NAME));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
        this.groupedStream.count((Materialized) null);
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        this.groupedStream.count(Materialized.as("count").withKeySerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("count");
            MatcherAssert.assertThat(keyValueStore.get("1"), CoreMatchers.equalTo(3L));
            MatcherAssert.assertThat(keyValueStore.get("2"), CoreMatchers.equalTo(1L));
            MatcherAssert.assertThat(keyValueStore.get("3"), CoreMatchers.equalTo(2L));
            KeyValueStore timestampedKeyValueStore = topologyTestDriver.getTimestampedKeyValueStore("count");
            MatcherAssert.assertThat(timestampedKeyValueStore.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make(3L, 10L)));
            MatcherAssert.assertThat(timestampedKeyValueStore.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 1L)));
            MatcherAssert.assertThat(timestampedKeyValueStore.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make(2L, 9L)));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregateWithBuiltInMetricsVersion0100To24() {
        shouldLogAndMeasureSkipsInAggregate("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregateWithBuiltInMetricsVersionLatest() {
        shouldLogAndMeasureSkipsInAggregate("latest");
    }

    private void shouldLogAndMeasureSkipsInAggregate(String str) {
        this.groupedStream.count(Materialized.as("count").withKeySerde(Serdes.String()));
        this.props.setProperty("built.in.metrics.version", str);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamAggregate.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    processData(topologyTestDriver);
                    if ("0.10.0-2.4".equals(str)) {
                        Map metrics = topologyTestDriver.metrics();
                        Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
                        Assert.assertNotEquals(Double.valueOf(0.0d), StreamsTestUtils.getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
                    }
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("reduce");
            MatcherAssert.assertThat(keyValueStore.get("1"), CoreMatchers.equalTo("A+C+D"));
            MatcherAssert.assertThat(keyValueStore.get("2"), CoreMatchers.equalTo("B"));
            MatcherAssert.assertThat(keyValueStore.get("3"), CoreMatchers.equalTo("E+F"));
            KeyValueStore timestampedKeyValueStore = topologyTestDriver.getTimestampedKeyValueStore("reduce");
            MatcherAssert.assertThat(timestampedKeyValueStore.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make("A+C+D", 10L)));
            MatcherAssert.assertThat(timestampedKeyValueStore.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make("B", 1L)));
            MatcherAssert.assertThat(timestampedKeyValueStore.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make("E+F", 9L)));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduceWithBuiltInMetricsVersion0100To24() {
        shouldLogAndMeasureSkipsInReduce("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduceWithBuiltInMetricsVersionLatest() {
        shouldLogAndMeasureSkipsInReduce("latest");
    }

    private void shouldLogAndMeasureSkipsInReduce(String str) {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        this.props.setProperty("built.in.metrics.version", str);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamReduce.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    processData(topologyTestDriver);
                    if ("0.10.0-2.4".equals(str)) {
                        Map metrics = topologyTestDriver.metrics();
                        Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
                        Assert.assertNotEquals(Double.valueOf(0.0d), StreamsTestUtils.getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
                    }
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("aggregate").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("aggregate");
            MatcherAssert.assertThat(keyValueStore.get("1"), CoreMatchers.equalTo("0+A+C+D"));
            MatcherAssert.assertThat(keyValueStore.get("2"), CoreMatchers.equalTo("0+B"));
            MatcherAssert.assertThat(keyValueStore.get("3"), CoreMatchers.equalTo("0+E+F"));
            KeyValueStore timestampedKeyValueStore = topologyTestDriver.getTimestampedKeyValueStore("aggregate");
            MatcherAssert.assertThat(timestampedKeyValueStore.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
            MatcherAssert.assertThat(timestampedKeyValueStore.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+B", 1L)));
            MatcherAssert.assertThat(timestampedKeyValueStore.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAggregateWithDefaultSerdes() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+B", 1L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private void processData(TopologyTestDriver topologyTestDriver) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("1", "A", 5L);
        createInputTopic.pipeInput("2", "B", 1L);
        createInputTopic.pipeInput("1", "C", 3L);
        createInputTopic.pipeInput("1", "D", 10L);
        createInputTopic.pipeInput("3", "E", 8L);
        createInputTopic.pipeInput("3", "F", 9L);
        createInputTopic.pipeInput("3", (String) null);
    }

    private void doCountWindowed(MockProcessorSupplier<Windowed<String>, Long> mockProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("1", "A", 0L);
                createInputTopic.pipeInput("1", "A", 499L);
                createInputTopic.pipeInput("1", "A", 100L);
                createInputTopic.pipeInput("2", "B", 0L);
                createInputTopic.pipeInput("2", "B", 100L);
                createInputTopic.pipeInput("2", "B", 200L);
                createInputTopic.pipeInput("3", "C", 1L);
                createInputTopic.pipeInput("1", "A", 500L);
                createInputTopic.pipeInput("1", "A", 500L);
                createInputTopic.pipeInput("2", "B", 500L);
                createInputTopic.pipeInput("2", "B", 500L);
                createInputTopic.pipeInput("3", "B", 100L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().processed, CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 2L, 499L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 3L, 499L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 2L, 100L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 3L, 200L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(0L, 500L)), 1L, 1L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(0L, 500L)), 2L, 100L))));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldCountWindowed() {
        MockProcessorSupplier<Windowed<String>, Long> mockProcessorSupplier = new MockProcessorSupplier<>();
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))).count(Materialized.as("aggregate-by-key-windowed")).toStream().process(mockProcessorSupplier, new String[0]);
        doCountWindowed(mockProcessorSupplier);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() {
        MockProcessorSupplier<Windowed<String>, Long> mockProcessorSupplier = new MockProcessorSupplier<>();
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))).count().toStream().process(mockProcessorSupplier, new String[0]);
        doCountWindowed(mockProcessorSupplier);
    }
}
