/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamWindowReduce;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamWindowReduceTest {
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());

    @Test
    public void shouldLogAndMeterOnNullKey() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).reduce((value1, value2) -> value1 + "+" + value2);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
            driver.pipeInput(this.recordFactory.create("TOPIC", null, (Object)"asdf"));
            LogCaptureAppender.unregister(appender);
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
        }
    }

    @Deprecated
    @Test
    public void shouldLogAndMeterOnExpiredEvent() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(5L)).until(100L)).reduce((value1, value2) -> value1 + "+" + value2).toStream().map((key, value) -> new KeyValue((Object)key.toString(), value)).to("output");
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            LogCaptureAppender.setClassLoggerToDebug(KStreamWindowReduce.class);
            LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"100", 100L));
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"0", 0L));
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"1", 1L));
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"2", 2L));
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"3", 3L));
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"4", 4L));
            driver.pipeInput(this.recordFactory.create("TOPIC", (Object)"k", (Object)"5", 5L));
            LogCaptureAppender.unregister(appender);
            Metric dropMetric = (Metric)driver.metrics().get(new MetricName("late-record-drop-total", "stream-processor-node-metrics", "The total number of occurrence of late-record-drop operations.", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)"topology-test-driver-virtual-thread"), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"KSTREAM-REDUCE-0000000002")})));
            MatcherAssert.assertThat((Object)dropMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)5.0));
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5] streamTime=[100]"}));
            OutputVerifier.compareKeyValueTimestamp(this.getOutput(driver), (Object)"[k@100/105]", (Object)"100", (long)100L);
            OutputVerifier.compareKeyValueTimestamp(this.getOutput(driver), (Object)"[k@5/10]", (Object)"5", (long)5L);
            MatcherAssert.assertThat((Object)driver.readOutput("output"), (Matcher)CoreMatchers.nullValue());
        }
    }

    private ProducerRecord<String, String> getOutput(TopologyTestDriver driver) {
        return driver.readOutput("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }
}

