package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.class */
public class KStreamAggregationDedupIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final long COMMIT_INTERVAL_MS = 300;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private KStream<Integer, String> stream;
    private final MockTime mockTime = CLUSTER.time;

    @Rule
    public TestName testName = new TestName();

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("commit.interval.ms", Long.valueOf(COMMIT_INTERVAL_MS));
        this.streamsConfiguration.put("cache.max.bytes.buffering", 10485760L);
        KeyValueMapper selectValueMapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
        this.groupedStream = this.stream.groupBy(selectValueMapper, Grouped.with(Serdes.String(), Serdes.String()));
        this.reducer = (str, str2) -> {
            return str + ":" + str2;
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        produceMessages(System.currentTimeMillis());
        this.groupedStream.reduce(this.reducer, Materialized.as("reduce-by-key")).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        startStreams();
        long currentTimeMillis = System.currentTimeMillis();
        produceMessages(currentTimeMillis);
        validateReceivedMessages(new StringDeserializer(), new StringDeserializer(), Arrays.asList(new KeyValueTimestamp("A", "A:A", currentTimeMillis), new KeyValueTimestamp("B", "B:B", currentTimeMillis), new KeyValueTimestamp("C", "C:C", currentTimeMillis), new KeyValueTimestamp("D", "D:D", currentTimeMillis), new KeyValueTimestamp("E", "E:E", currentTimeMillis)));
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() - 1000;
        produceMessages(currentTimeMillis);
        long currentTimeMillis2 = System.currentTimeMillis();
        produceMessages(currentTimeMillis2);
        produceMessages(currentTimeMillis2);
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))).reduce(this.reducer, Materialized.as("reduce-time-windows")).toStream((windowed, str) -> {
            return ((String) windowed.key()) + "@" + windowed.window().start();
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        startStreams();
        long j = (currentTimeMillis / 500) * 500;
        long j2 = (currentTimeMillis2 / 500) * 500;
        validateReceivedMessages(new StringDeserializer(), new StringDeserializer(), Arrays.asList(new KeyValueTimestamp("A@" + j, "A", currentTimeMillis), new KeyValueTimestamp("A@" + j2, "A:A", currentTimeMillis2), new KeyValueTimestamp("B@" + j, "B", currentTimeMillis), new KeyValueTimestamp("B@" + j2, "B:B", currentTimeMillis2), new KeyValueTimestamp("C@" + j, "C", currentTimeMillis), new KeyValueTimestamp("C@" + j2, "C:C", currentTimeMillis2), new KeyValueTimestamp("D@" + j, "D", currentTimeMillis), new KeyValueTimestamp("D@" + j2, "D:D", currentTimeMillis2), new KeyValueTimestamp("E@" + j, "E", currentTimeMillis), new KeyValueTimestamp("E@" + j2, "E:E", currentTimeMillis2)));
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        produceMessages(milliseconds);
        produceMessages(milliseconds);
        this.stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())).windowedBy(TimeWindows.of(Duration.ofMillis(500L))).count(Materialized.as("count-windows")).toStream((windowed, l) -> {
            return windowed.key() + "@" + windowed.window().start();
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        startStreams();
        long j = (milliseconds / 500) * 500;
        validateReceivedMessages(new StringDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValueTimestamp("1@" + j, 2L, milliseconds), new KeyValueTimestamp("2@" + j, 2L, milliseconds), new KeyValueTimestamp("3@" + j, 2L, milliseconds), new KeyValueTimestamp("4@" + j, 2L, milliseconds), new KeyValueTimestamp("5@" + j, 2L, milliseconds)));
    }

    private void produceMessages(long j) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue(1, "A"), new KeyValue(2, "B"), new KeyValue(3, "C"), new KeyValue(4, "D"), new KeyValue(5, "E")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
    }

    private void createTopics() throws InterruptedException {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamOneInput = "stream-one-" + safeUniqueTestName;
        this.outputTopic = "output-" + safeUniqueTestName;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> void validateReceivedMessages(Deserializer<K> deserializer, Deserializer<V> deserializer2, List<KeyValueTimestamp<K, V>> list) throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueTimestampRecordsReceived(properties, this.outputTopic, list);
    }
}
