package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.class */
public class KStreamAggregationIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static volatile int testNo = 0;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private Initializer<Integer> initializer;
    private Aggregator<String, String, Integer> aggregator;
    private KStream<Integer, String> stream;
    private final MockTime mockTime = CLUSTER.time;
    private String userSessionsStream = "user-sessions";

    @Before
    public void before() throws InterruptedException {
        testNo += NUM_BROKERS;
        this.builder = new StreamsBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "kgrouped-stream-test-" + testNo);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        KeyValueMapper selectValueMapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
        this.groupedStream = this.stream.groupBy(selectValueMapper, Serialized.with(Serdes.String(), Serdes.String()));
        this.reducer = new Reducer<String>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.1
            public String apply(String str, String str2) {
                return str + ":" + str2;
            }
        };
        this.initializer = new Initializer<Integer>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.2
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public Integer m10apply() {
                return 0;
            }
        };
        this.aggregator = new Aggregator<String, String, Integer>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.3
            public Integer apply(String str, String str2, Integer num) {
                return Integer.valueOf(num.intValue() + str2.length());
            }
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.reduce(this.reducer, "reduce-by-key").to(Serdes.String(), Serdes.String(), this.outputTopic);
        startStreams();
        produceMessages(this.mockTime.milliseconds());
        List receiveMessages = receiveMessages(new StringDeserializer(), new StringDeserializer(), 10);
        Collections.sort(receiveMessages, new Comparator<KeyValue<String, String>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.4
            @Override // java.util.Comparator
            public int compare(KeyValue<String, String> keyValue, KeyValue<String, String> keyValue2) {
                return KStreamAggregationIntegrationTest.compare(keyValue, keyValue2);
            }
        });
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(KeyValue.pair("A", "A"), KeyValue.pair("A", "A:A"), KeyValue.pair("B", "B"), KeyValue.pair("B", "B:B"), KeyValue.pair("C", "C"), KeyValue.pair("C", "C:C"), KeyValue.pair("D", "D"), KeyValue.pair("D", "D:D"), KeyValue.pair("E", "E"), KeyValue.pair("E", "E:E"))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K extends Comparable, V extends Comparable> int compare(KeyValue<K, V> keyValue, KeyValue<K, V> keyValue2) {
        int compareTo = ((Comparable) keyValue.key).compareTo(keyValue2.key);
        return compareTo == 0 ? ((Comparable) keyValue.value).compareTo(keyValue2.value) : compareTo;
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        produceMessages(milliseconds);
        long milliseconds2 = this.mockTime.milliseconds();
        produceMessages(milliseconds2);
        produceMessages(milliseconds2);
        this.groupedStream.windowedBy(TimeWindows.of(500L)).reduce(this.reducer).toStream(new KeyValueMapper<Windowed<String>, String, String>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.5
            public String apply(Windowed<String> windowed, String str) {
                return ((String) windowed.key()) + "@" + windowed.window().start();
            }
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        startStreams();
        List receiveMessages = receiveMessages(new StringDeserializer(), new StringDeserializer(), 15);
        Collections.sort(receiveMessages, new Comparator<KeyValue<String, String>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.6
            @Override // java.util.Comparator
            public int compare(KeyValue<String, String> keyValue, KeyValue<String, String> keyValue2) {
                return KStreamAggregationIntegrationTest.compare(keyValue, keyValue2);
            }
        });
        long j = (milliseconds / 500) * 500;
        long j2 = (milliseconds2 / 500) * 500;
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(new KeyValue("A@" + j, "A"), new KeyValue("A@" + j2, "A"), new KeyValue("A@" + j2, "A:A"), new KeyValue("B@" + j, "B"), new KeyValue("B@" + j2, "B"), new KeyValue("B@" + j2, "B:B"), new KeyValue("C@" + j, "C"), new KeyValue("C@" + j2, "C"), new KeyValue("C@" + j2, "C:C"), new KeyValue("D@" + j, "D"), new KeyValue("D@" + j2, "D"), new KeyValue("D@" + j2, "D:D"), new KeyValue("E@" + j, "E"), new KeyValue("E@" + j2, "E"), new KeyValue("E@" + j2, "E:E"))));
    }

    @Test
    public void shouldAggregate() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.aggregate(this.initializer, this.aggregator, Serdes.Integer(), "aggregate-by-selected-key").to(Serdes.String(), Serdes.Integer(), this.outputTopic);
        startStreams();
        produceMessages(this.mockTime.milliseconds());
        List receiveMessages = receiveMessages(new StringDeserializer(), new IntegerDeserializer(), 10);
        Collections.sort(receiveMessages, new Comparator<KeyValue<String, Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.7
            @Override // java.util.Comparator
            public int compare(KeyValue<String, Integer> keyValue, KeyValue<String, Integer> keyValue2) {
                return KStreamAggregationIntegrationTest.compare(keyValue, keyValue2);
            }
        });
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(KeyValue.pair("A", Integer.valueOf(NUM_BROKERS)), KeyValue.pair("A", 2), KeyValue.pair("B", Integer.valueOf(NUM_BROKERS)), KeyValue.pair("B", 2), KeyValue.pair("C", Integer.valueOf(NUM_BROKERS)), KeyValue.pair("C", 2), KeyValue.pair("D", Integer.valueOf(NUM_BROKERS)), KeyValue.pair("D", 2), KeyValue.pair("E", Integer.valueOf(NUM_BROKERS)), KeyValue.pair("E", 2))));
    }

    @Test
    public void shouldAggregateWindowed() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        produceMessages(milliseconds);
        long milliseconds2 = this.mockTime.milliseconds();
        produceMessages(milliseconds2);
        produceMessages(milliseconds2);
        this.groupedStream.windowedBy(TimeWindows.of(500L)).aggregate(this.initializer, this.aggregator, Materialized.with((Serde) null, Serdes.Integer())).toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.8
            public String apply(Windowed<String> windowed, Integer num) {
                return ((String) windowed.key()) + "@" + windowed.window().start();
            }
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
        startStreams();
        List receiveMessages = receiveMessages(new StringDeserializer(), new IntegerDeserializer(), 15);
        Collections.sort(receiveMessages, new Comparator<KeyValue<String, Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.9
            @Override // java.util.Comparator
            public int compare(KeyValue<String, Integer> keyValue, KeyValue<String, Integer> keyValue2) {
                return KStreamAggregationIntegrationTest.compare(keyValue, keyValue2);
            }
        });
        long j = (milliseconds / 500) * 500;
        long j2 = (milliseconds2 / 500) * 500;
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(new KeyValue("A@" + j, Integer.valueOf(NUM_BROKERS)), new KeyValue("A@" + j2, Integer.valueOf(NUM_BROKERS)), new KeyValue("A@" + j2, 2), new KeyValue("B@" + j, Integer.valueOf(NUM_BROKERS)), new KeyValue("B@" + j2, Integer.valueOf(NUM_BROKERS)), new KeyValue("B@" + j2, 2), new KeyValue("C@" + j, Integer.valueOf(NUM_BROKERS)), new KeyValue("C@" + j2, Integer.valueOf(NUM_BROKERS)), new KeyValue("C@" + j2, 2), new KeyValue("D@" + j, Integer.valueOf(NUM_BROKERS)), new KeyValue("D@" + j2, Integer.valueOf(NUM_BROKERS)), new KeyValue("D@" + j2, 2), new KeyValue("E@" + j, Integer.valueOf(NUM_BROKERS)), new KeyValue("E@" + j2, Integer.valueOf(NUM_BROKERS)), new KeyValue("E@" + j2, 2))));
    }

    private void shouldCountHelper() throws Exception {
        startStreams();
        produceMessages(this.mockTime.milliseconds());
        List receiveMessages = receiveMessages(new StringDeserializer(), new LongDeserializer(), 10);
        Collections.sort(receiveMessages, new Comparator<KeyValue<String, Long>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.10
            @Override // java.util.Comparator
            public int compare(KeyValue<String, Long> keyValue, KeyValue<String, Long> keyValue2) {
                return KStreamAggregationIntegrationTest.compare(keyValue, keyValue2);
            }
        });
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(KeyValue.pair("A", 1L), KeyValue.pair("A", 2L), KeyValue.pair("B", 1L), KeyValue.pair("B", 2L), KeyValue.pair("C", 1L), KeyValue.pair("C", 2L), KeyValue.pair("D", 1L), KeyValue.pair("D", 2L), KeyValue.pair("E", 1L), KeyValue.pair("E", 2L))));
    }

    @Test
    public void shouldCount() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count("count-by-key").to(Serdes.String(), Serdes.Long(), this.outputTopic);
        shouldCountHelper();
    }

    @Test
    public void shouldCountWithInternalStore() throws Exception {
        produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count().to(Serdes.String(), Serdes.Long(), this.outputTopic);
        shouldCountHelper();
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        produceMessages(milliseconds);
        produceMessages(milliseconds);
        this.stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())).windowedBy(TimeWindows.of(500L)).count().toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.11
            public String apply(Windowed<Integer> windowed, Long l) {
                return windowed.key() + "@" + windowed.window().start();
            }
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        startStreams();
        List receiveMessages = receiveMessages(new StringDeserializer(), new LongDeserializer(), 10);
        Collections.sort(receiveMessages, new Comparator<KeyValue<String, Long>>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.12
            @Override // java.util.Comparator
            public int compare(KeyValue<String, Long> keyValue, KeyValue<String, Long> keyValue2) {
                return KStreamAggregationIntegrationTest.compare(keyValue, keyValue2);
            }
        });
        long j = (milliseconds / 500) * 500;
        MatcherAssert.assertThat(receiveMessages, Is.is(Arrays.asList(KeyValue.pair("1@" + j, 1L), KeyValue.pair("1@" + j, 2L), KeyValue.pair("2@" + j, 1L), KeyValue.pair("2@" + j, 2L), KeyValue.pair("3@" + j, 1L), KeyValue.pair("3@" + j, 2L), KeyValue.pair("4@" + j, 1L), KeyValue.pair("4@" + j, 2L), KeyValue.pair("5@" + j, 1L), KeyValue.pair("5@" + j, 2L))));
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        long milliseconds = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "start"), new KeyValue("penny", "start"), new KeyValue("jo", "pause"), new KeyValue("emily", "pause")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(milliseconds));
        long j = milliseconds + 150000;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("emily", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
        long j2 = milliseconds + 300000 + 1;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "pause"), new KeyValue("penny", "stop")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j2));
        long j3 = j2 + 150000;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "resume"), new KeyValue("jo", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j3));
        final HashMap hashMap = new HashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(11);
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count(SessionWindows.with(300000L).until(900000L)).toStream().foreach(new ForeachAction<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.13
            public void apply(Windowed<String> windowed, Long l) {
                hashMap.put(windowed, l);
                countDownLatch.countDown();
            }
        });
        startStreams();
        countDownLatch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(j3, j3))), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("emily", new SessionWindow(milliseconds, j))), CoreMatchers.equalTo(2L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(j2, j3))), CoreMatchers.equalTo(2L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(j2, j2))), CoreMatchers.equalTo(1L));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        long milliseconds = this.mockTime.milliseconds();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "start"), new KeyValue("penny", "start"), new KeyValue("jo", "pause"), new KeyValue("emily", "pause")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(milliseconds));
        long j = milliseconds + 500;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue("emily", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
        long j2 = milliseconds + 1000 + 1;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "pause"), new KeyValue("penny", "stop")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j2));
        long j3 = j2 + 500;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue("bob", "resume"), new KeyValue("jo", "resume")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j3));
        final HashMap hashMap = new HashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(11);
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).reduce(new Reducer<String>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.15
            public String apply(String str, String str2) {
                return str + ":" + str2;
            }
        }, SessionWindows.with(1000L).until(3000L), "UserSessionsStore").foreach(new ForeachAction<Windowed<String>, String>() { // from class: org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.14
            public void apply(Windowed<String> windowed, String str) {
                hashMap.put(windowed, str);
                countDownLatch.countDown();
            }
        });
        startStreams();
        countDownLatch.await(30L, TimeUnit.SECONDS);
        ReadOnlySessionStore readOnlySessionStore = (ReadOnlySessionStore) this.kafkaStreams.store("UserSessionsStore", QueryableStoreTypes.sessionStore());
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo("start"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo("start"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(milliseconds, milliseconds))), CoreMatchers.equalTo("pause"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("jo", new SessionWindow(j3, j3))), CoreMatchers.equalTo("resume"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("emily", new SessionWindow(milliseconds, j))), CoreMatchers.equalTo("pause:resume"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("bob", new SessionWindow(j2, j3))), CoreMatchers.equalTo("pause:resume"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("penny", new SessionWindow(j2, j2))), CoreMatchers.equalTo("stop"));
        KeyValueIterator fetch = readOnlySessionStore.fetch("bob");
        MatcherAssert.assertThat(fetch.next(), CoreMatchers.equalTo(KeyValue.pair(new Windowed("bob", new SessionWindow(milliseconds, milliseconds)), "start")));
        MatcherAssert.assertThat(fetch.next(), CoreMatchers.equalTo(KeyValue.pair(new Windowed("bob", new SessionWindow(j2, j3)), "pause:resume")));
        Assert.assertFalse(fetch.hasNext());
    }

    private void produceMessages(long j) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B"), new KeyValue(3, "C"), new KeyValue(4, "D"), new KeyValue(5, "E")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.outputTopic = "output-" + testNo;
        this.userSessionsStream += "-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 3, NUM_BROKERS);
        CLUSTER.createTopics(this.userSessionsStream, this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValue<K, V>> receiveMessages(Deserializer<K> deserializer, Deserializer<V> deserializer2, int i) throws InterruptedException {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "kgroupedstream-test-" + testNo);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, this.outputTopic, i, 60000L);
    }
}
