package org.apache.kafka.streams.integration;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.tools.ConsoleConsumer;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.class */
public class KStreamAggregationIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String userSessionsStream;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private Initializer<Integer> initializer;
    private Aggregator<String, String, Integer> aggregator;
    private KStream<Integer, String> stream;
    private final MockTime mockTime = CLUSTER.time;

    @Rule
    public TestName testName = new TestName();

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        KeyValueMapper selectValueMapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
        this.groupedStream = this.stream.groupBy(selectValueMapper, Grouped.with(Serdes.String(), Serdes.String()));
        this.reducer = (str, str2) -> {
            return str + ":" + str2;
        };
        this.initializer = () -> {
            return 0;
        };
        this.aggregator = (str3, str4, num) -> {
            return Integer.valueOf(num.intValue() + str4.length());
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.reduce(this.reducer, Materialized.as("reduce-by-key")).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        startStreams();
        produceMessages(this.mockTime.milliseconds());
        List receiveMessages = receiveMessages(new StringDeserializer(), new StringDeserializer(), 10);
        receiveMessages.sort(KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(new KeyValueTimestamp("A", "A", this.mockTime.milliseconds()), new KeyValueTimestamp("A", "A:A", this.mockTime.milliseconds()), new KeyValueTimestamp("B", "B", this.mockTime.milliseconds()), new KeyValueTimestamp("B", "B:B", this.mockTime.milliseconds()), new KeyValueTimestamp("C", "C", this.mockTime.milliseconds()), new KeyValueTimestamp("C", "C:C", this.mockTime.milliseconds()), new KeyValueTimestamp("D", "D", this.mockTime.milliseconds()), new KeyValueTimestamp("D", "D:D", this.mockTime.milliseconds()), new KeyValueTimestamp("E", "E", this.mockTime.milliseconds()), new KeyValueTimestamp("E", "E:E", this.mockTime.milliseconds()))));
    }

    private static <K extends Comparable, V extends Comparable> int compare(KeyValueTimestamp<K, V> keyValueTimestamp, KeyValueTimestamp<K, V> keyValueTimestamp2) {
        int compareTo = keyValueTimestamp.key().compareTo(keyValueTimestamp2.key());
        if (compareTo != 0) {
            return compareTo;
        }
        int compareTo2 = keyValueTimestamp.value().compareTo(keyValueTimestamp2.value());
        return compareTo2 == 0 ? Long.compare(keyValueTimestamp.timestamp(), keyValueTimestamp2.timestamp()) : compareTo2;
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        produceMessages(milliseconds);
        long milliseconds2 = this.mockTime.milliseconds();
        produceMessages(milliseconds2);
        produceMessages(milliseconds2);
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))).reduce(this.reducer).toStream().to(this.outputTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.String()));
        startStreams();
        List receiveMessages = receiveMessages(new TimeWindowedDeserializer(), new StringDeserializer(), String.class, 15);
        String readWindowedKeyedMessagesViaConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(new TimeWindowedDeserializer(), new StringDeserializer(), String.class, 15, true);
        receiveMessages.sort(Comparator.comparing(keyValueTimestamp -> {
            return (String) ((Windowed) keyValueTimestamp.key()).key();
        }).thenComparing((v0) -> {
            return v0.value();
        }));
        long j = (milliseconds / 500) * 500;
        long j2 = (milliseconds2 / 500) * 500;
        List<KeyValueTimestamp> asList = Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(j, Long.MAX_VALUE)), "A", milliseconds), new KeyValueTimestamp(new Windowed("A", new TimeWindow(j2, Long.MAX_VALUE)), "A", milliseconds2), new KeyValueTimestamp(new Windowed("A", new TimeWindow(j2, Long.MAX_VALUE)), "A:A", milliseconds2), new KeyValueTimestamp(new Windowed("B", new TimeWindow(j, Long.MAX_VALUE)), "B", milliseconds), new KeyValueTimestamp(new Windowed("B", new TimeWindow(j2, Long.MAX_VALUE)), "B", milliseconds2), new KeyValueTimestamp(new Windowed("B", new TimeWindow(j2, Long.MAX_VALUE)), "B:B", milliseconds2), new KeyValueTimestamp(new Windowed("C", new TimeWindow(j, Long.MAX_VALUE)), "C", milliseconds), new KeyValueTimestamp(new Windowed("C", new TimeWindow(j2, Long.MAX_VALUE)), "C", milliseconds2), new KeyValueTimestamp(new Windowed("C", new TimeWindow(j2, Long.MAX_VALUE)), "C:C", milliseconds2), new KeyValueTimestamp(new Windowed("D", new TimeWindow(j, Long.MAX_VALUE)), "D", milliseconds), new KeyValueTimestamp(new Windowed("D", new TimeWindow(j2, Long.MAX_VALUE)), "D", milliseconds2), new KeyValueTimestamp(new Windowed("D", new TimeWindow(j2, Long.MAX_VALUE)), "D:D", milliseconds2), new KeyValueTimestamp(new Windowed("E", new TimeWindow(j, Long.MAX_VALUE)), "E", milliseconds), new KeyValueTimestamp(new Windowed("E", new TimeWindow(j2, Long.MAX_VALUE)), "E", milliseconds2), new KeyValueTimestamp(new Windowed("E", new TimeWindow(j2, Long.MAX_VALUE)), "E:E", milliseconds2));
        MatcherAssert.assertThat(receiveMessages, Is.is(asList));
        HashSet hashSet = new HashSet(asList.size());
        for (KeyValueTimestamp keyValueTimestamp2 : asList) {
            hashSet.add("CreateTime:" + keyValueTimestamp2.timestamp() + ", " + keyValueTimestamp2.key() + ", " + ((String) keyValueTimestamp2.value()));
        }
        String[] split = readWindowedKeyedMessagesViaConsoleConsumer.split("\n");
        int length = split.length;
        for (int i = 0; i < length; i += NUM_BROKERS) {
            Assert.assertTrue(hashSet.contains(split[i]));
        }
    }

    @Test
    public void shouldAggregate() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.aggregate(this.initializer, this.aggregator, Materialized.as("aggregate-by-selected-key")).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
        startStreams();
        produceMessages(this.mockTime.milliseconds());
        List receiveMessages = receiveMessages(new StringDeserializer(), new IntegerDeserializer(), 10);
        receiveMessages.sort(KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(new KeyValueTimestamp("A", Integer.valueOf(NUM_BROKERS), this.mockTime.milliseconds()), new KeyValueTimestamp("A", 2, this.mockTime.milliseconds()), new KeyValueTimestamp("B", Integer.valueOf(NUM_BROKERS), this.mockTime.milliseconds()), new KeyValueTimestamp("B", 2, this.mockTime.milliseconds()), new KeyValueTimestamp("C", Integer.valueOf(NUM_BROKERS), this.mockTime.milliseconds()), new KeyValueTimestamp("C", 2, this.mockTime.milliseconds()), new KeyValueTimestamp("D", Integer.valueOf(NUM_BROKERS), this.mockTime.milliseconds()), new KeyValueTimestamp("D", 2, this.mockTime.milliseconds()), new KeyValueTimestamp("E", Integer.valueOf(NUM_BROKERS), this.mockTime.milliseconds()), new KeyValueTimestamp("E", 2, this.mockTime.milliseconds()))));
    }

    @Test
    public void shouldAggregateWindowed() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        produceMessages(milliseconds);
        long milliseconds2 = this.mockTime.milliseconds();
        produceMessages(milliseconds2);
        produceMessages(milliseconds2);
        this.groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))).aggregate(this.initializer, this.aggregator, Materialized.with((Serde) null, Serdes.Integer())).toStream().to(this.outputTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Integer()));
        startStreams();
        List receiveMessagesWithTimestamp = receiveMessagesWithTimestamp(new TimeWindowedDeserializer(), new IntegerDeserializer(), String.class, 15);
        String readWindowedKeyedMessagesViaConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(new TimeWindowedDeserializer(), new IntegerDeserializer(), String.class, 15, true);
        receiveMessagesWithTimestamp.sort(Comparator.comparing(keyValueTimestamp -> {
            return (String) ((Windowed) keyValueTimestamp.key()).key();
        }).thenComparingInt((v0) -> {
            return v0.value();
        }));
        long j = (milliseconds / 500) * 500;
        long j2 = (milliseconds2 / 500) * 500;
        List<KeyValueTimestamp> asList = Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(j, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds), new KeyValueTimestamp(new Windowed("A", new TimeWindow(j2, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds2), new KeyValueTimestamp(new Windowed("A", new TimeWindow(j2, Long.MAX_VALUE)), 2, milliseconds2), new KeyValueTimestamp(new Windowed("B", new TimeWindow(j, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds), new KeyValueTimestamp(new Windowed("B", new TimeWindow(j2, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds2), new KeyValueTimestamp(new Windowed("B", new TimeWindow(j2, Long.MAX_VALUE)), 2, milliseconds2), new KeyValueTimestamp(new Windowed("C", new TimeWindow(j, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds), new KeyValueTimestamp(new Windowed("C", new TimeWindow(j2, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds2), new KeyValueTimestamp(new Windowed("C", new TimeWindow(j2, Long.MAX_VALUE)), 2, milliseconds2), new KeyValueTimestamp(new Windowed("D", new TimeWindow(j, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds), new KeyValueTimestamp(new Windowed("D", new TimeWindow(j2, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds2), new KeyValueTimestamp(new Windowed("D", new TimeWindow(j2, Long.MAX_VALUE)), 2, milliseconds2), new KeyValueTimestamp(new Windowed("E", new TimeWindow(j, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds), new KeyValueTimestamp(new Windowed("E", new TimeWindow(j2, Long.MAX_VALUE)), Integer.valueOf(NUM_BROKERS), milliseconds2), new KeyValueTimestamp(new Windowed("E", new TimeWindow(j2, Long.MAX_VALUE)), 2, milliseconds2));
        MatcherAssert.assertThat(receiveMessagesWithTimestamp, Is.is(asList));
        HashSet hashSet = new HashSet(asList.size());
        for (KeyValueTimestamp keyValueTimestamp2 : asList) {
            hashSet.add("CreateTime:" + keyValueTimestamp2.timestamp() + ", " + keyValueTimestamp2.key() + ", " + keyValueTimestamp2.value());
        }
        String[] split = readWindowedKeyedMessagesViaConsoleConsumer.split("\n");
        int length = split.length;
        for (int i = 0; i < length; i += NUM_BROKERS) {
            Assert.assertTrue(hashSet.contains(split[i]));
        }
    }

    private void shouldCountHelper() throws Exception {
        startStreams();
        produceMessages(this.mockTime.milliseconds());
        List receiveMessages = receiveMessages(new StringDeserializer(), new LongDeserializer(), 10);
        receiveMessages.sort(KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(new KeyValueTimestamp("A", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp("A", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp("B", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp("B", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp("C", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp("C", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp("D", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp("D", 2L, this.mockTime.milliseconds()), new KeyValueTimestamp("E", 1L, this.mockTime.milliseconds()), new KeyValueTimestamp("E", 2L, this.mockTime.milliseconds()))));
    }

    @Test
    public void shouldCount() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count(Materialized.as("count-by-key")).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        shouldCountHelper();
    }

    @Test
    public void shouldCountWithInternalStore() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count().toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        shouldCountHelper();
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        produceMessages(milliseconds);
        produceMessages(milliseconds);
        this.stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())).windowedBy(TimeWindows.of(Duration.ofMillis(500L))).count().toStream((windowed, l) -> {
            return windowed.key() + "@" + windowed.window().start();
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        startStreams();
        List receiveMessages = receiveMessages(new StringDeserializer(), new LongDeserializer(), 10);
        receiveMessages.sort(KStreamAggregationIntegrationTest::compare);
        long j = (milliseconds / 500) * 500;
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(new KeyValueTimestamp("1@" + j, 1L, milliseconds), new KeyValueTimestamp("1@" + j, 2L, milliseconds), new KeyValueTimestamp("2@" + j, 1L, milliseconds), new KeyValueTimestamp("2@" + j, 2L, milliseconds), new KeyValueTimestamp("3@" + j, 1L, milliseconds), new KeyValueTimestamp("3@" + j, 2L, milliseconds), new KeyValueTimestamp("4@" + j, 1L, milliseconds), new KeyValueTimestamp("4@" + j, 2L, milliseconds), new KeyValueTimestamp("5@" + j, 1L, milliseconds), new KeyValueTimestamp("5@" + j, 2L, milliseconds))));
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        List asList = Arrays.asList(new KeyValue("bob", "start"), new KeyValue("penny", "start"), new KeyValue("jo", "pause"), new KeyValue("emily", "pause"));
        long milliseconds = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, asList, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(milliseconds));
        long j = milliseconds + 150000;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("emily", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
        long j2 = milliseconds + 300000 + 1;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "pause"), new KeyValue("penny", "stop")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j2));
        long j3 = j2 + 150000;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "resume"), new KeyValue("jo", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j3));
        long j4 = j3 - 1;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("jo", "late")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j4));
        HashMap hashMap = new HashMap();
        CountDownLatch countDownLatch = new CountDownLatch(13);
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SessionWindows.with(Duration.ofMillis(300000L))).count().toStream().transform(() -> {
            return new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.1
                private ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public KeyValue<Object, Object> transform(Windowed<String> windowed, Long l) {
                    hashMap.put(windowed, KeyValue.pair(l, Long.valueOf(this.context.timestamp())));
                    countDownLatch.countDown();
                    return null;
                }

                public void close() {
                }
            };
        }, new String[0]);
        startStreams();
        countDownLatch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(milliseconds))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(milliseconds))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(milliseconds))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(j4, j3))), CoreMatchers.equalTo(KeyValue.pair(2L, Long.valueOf(j3))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("emily", new SessionWindow(milliseconds, j))), CoreMatchers.equalTo(KeyValue.pair(2L, Long.valueOf(j))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(j2, j3))), CoreMatchers.equalTo(KeyValue.pair(2L, Long.valueOf(j3))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(j2, j2))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(j2))));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        List asList = Arrays.asList(new KeyValue("bob", "start"), new KeyValue("penny", "start"), new KeyValue("jo", "pause"), new KeyValue("emily", "pause"));
        long milliseconds = this.mockTime.milliseconds();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, asList, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(milliseconds));
        long j = milliseconds + 500;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("emily", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
        long j2 = milliseconds + 1000 + 1;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "pause"), new KeyValue("penny", "stop")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j2));
        long j3 = j2 + 500;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "resume"), new KeyValue("jo", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j3));
        long j4 = j3 - 1;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("jo", "late")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j4));
        HashMap hashMap = new HashMap();
        CountDownLatch countDownLatch = new CountDownLatch(13);
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SessionWindows.with(Duration.ofMillis(1000L))).reduce((str, str2) -> {
            return str + ":" + str2;
        }, Materialized.as("UserSessionsStore")).toStream().transform(() -> {
            return new Transformer<Windowed<String>, String, KeyValue<Object, Object>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.2
                private ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public KeyValue<Object, Object> transform(Windowed<String> windowed, String str3) {
                    hashMap.put(windowed, KeyValue.pair(str3, Long.valueOf(this.context.timestamp())));
                    countDownLatch.countDown();
                    return null;
                }

                public void close() {
                }
            };
        }, new String[0]);
        startStreams();
        countDownLatch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(KeyValue.pair("start", Long.valueOf(milliseconds))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(KeyValue.pair("start", Long.valueOf(milliseconds))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(KeyValue.pair("pause", Long.valueOf(milliseconds))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(j4, j3))), CoreMatchers.equalTo(KeyValue.pair("resume:late", Long.valueOf(j3))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("emily", new SessionWindow(milliseconds, j))), CoreMatchers.equalTo(KeyValue.pair("pause:resume", Long.valueOf(j))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(j2, j3))), CoreMatchers.equalTo(KeyValue.pair("pause:resume", Long.valueOf(j3))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(j2, j2))), CoreMatchers.equalTo(KeyValue.pair("stop", Long.valueOf(j2))));
        KeyValueIterator fetch = ((ReadOnlySessionStore) IntegrationTestUtils.getStore("UserSessionsStore", this.kafkaStreams, QueryableStoreTypes.sessionStore())).fetch("bob");
        MatcherAssert.assertThat(fetch.next(), CoreMatchers.equalTo(KeyValue.pair(new Windowed("bob", new SessionWindow(milliseconds, milliseconds)), "start")));
        MatcherAssert.assertThat(fetch.next(), CoreMatchers.equalTo(KeyValue.pair(new Windowed("bob", new SessionWindow(j2, j3)), "pause:resume")));
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldCountUnlimitedWindows() throws Exception {
        long milliseconds = (this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS)) + 1;
        long millis = Duration.ofDays(1L).toMillis();
        long milliseconds2 = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        List asList = Arrays.asList(new KeyValue("bob", "start"), new KeyValue("penny", "start"), new KeyValue("jo", "pause"), new KeyValue("emily", "pause"));
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, asList, producerConfig, Long.valueOf(milliseconds2));
        long j = milliseconds2 + millis;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("emily", "resume")), producerConfig, Long.valueOf(j));
        long j2 = j + millis;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "pause"), new KeyValue("penny", "stop")), producerConfig, Long.valueOf(j2));
        long j3 = j2 + millis;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "resume"), new KeyValue("jo", "resume")), producerConfig, Long.valueOf(j3));
        HashMap hashMap = new HashMap();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(UnlimitedWindows.of().startOn(Instant.ofEpochMilli(milliseconds))).count().toStream().transform(() -> {
            return new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.3
                private ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public KeyValue<Object, Object> transform(Windowed<String> windowed, Long l) {
                    hashMap.put(windowed, KeyValue.pair(l, Long.valueOf(this.context.timestamp())));
                    countDownLatch.countDown();
                    return null;
                }

                public void close() {
                }
            };
        }, new String[0]);
        startStreams();
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new UnlimitedWindow(milliseconds))), CoreMatchers.equalTo(KeyValue.pair(2L, Long.valueOf(j3))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new UnlimitedWindow(milliseconds))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(j2))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new UnlimitedWindow(milliseconds))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(j3))));
        MatcherAssert.assertThat(hashMap.get(new Windowed("emily", new UnlimitedWindow(milliseconds))), CoreMatchers.equalTo(KeyValue.pair(1L, Long.valueOf(j))));
    }

    private void produceMessages(long j) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B"), new KeyValue(3, "C"), new KeyValue(4, "D"), new KeyValue(5, "E")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
    }

    private void createTopics() throws InterruptedException {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamOneInput = "stream-one-" + safeUniqueTestName;
        this.outputTopic = "output-" + safeUniqueTestName;
        this.userSessionsStream = "user-sessions-" + safeUniqueTestName;
        CLUSTER.createTopic(this.streamOneInput, 3, NUM_BROKERS);
        CLUSTER.createTopics(this.userSessionsStream, this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(Deserializer<K> deserializer, Deserializer<V> deserializer2, int i) throws Exception {
        return receiveMessages(deserializer, deserializer2, null, i);
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(Deserializer<K> deserializer, Deserializer<V> deserializer2, Class cls, int i) throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        if ((deserializer instanceof TimeWindowedDeserializer) || (deserializer instanceof SessionWindowedDeserializer)) {
            properties.setProperty("default.windowed.key.serde.inner", Serdes.serdeFrom(cls).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(properties, this.outputTopic, i, IntegrationTestUtils.DEFAULT_TIMEOUT);
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(Deserializer<K> deserializer, Deserializer<V> deserializer2, Class cls, int i) throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        if ((deserializer instanceof TimeWindowedDeserializer) || (deserializer instanceof SessionWindowedDeserializer)) {
            properties.setProperty("default.windowed.key.serde.inner", Serdes.serdeFrom(cls).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(properties, this.outputTopic, i, IntegrationTestUtils.DEFAULT_TIMEOUT);
    }

    private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, Class cls, int i, boolean z) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream = System.out;
        PrintStream printStream2 = new PrintStream(byteArrayOutputStream);
        Throwable th = null;
        try {
            try {
                System.setOut(printStream2);
                String[] strArr = {"--bootstrap-server", CLUSTER.bootstrapServers(), "--from-beginning", "--property", "print.key=true", "--property", "print.timestamp=" + z, "--topic", this.outputTopic, "--max-messages", String.valueOf(i), "--property", "key.deserializer=" + deserializer.getClass().getName(), "--property", "value.deserializer=" + deserializer2.getClass().getName(), "--property", "key.separator=, ", "--property", "key.deserializer.default.windowed.key.serde.inner=" + Serdes.serdeFrom(cls).getClass().getName()};
                ConsoleConsumer.messageCount_$eq(0);
                ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(strArr));
                printStream2.flush();
                System.setOut(printStream);
                String byteArrayOutputStream2 = byteArrayOutputStream.toString();
                if (printStream2 != null) {
                    if (0 != 0) {
                        try {
                            printStream2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        printStream2.close();
                    }
                }
                return byteArrayOutputStream2;
            } finally {
            }
        } catch (Throwable th3) {
            if (printStream2 != null) {
                if (th != null) {
                    try {
                        printStream2.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    printStream2.close();
                }
            }
            throw th3;
        }
    }
}
