package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.class */
public class KStreamSlidingWindowAggregateTest {

    @Parameterized.Parameter
    public boolean inOrderIterator;
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private final String threadId = Thread.currentThread().getName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest$InOrderMemoryWindowStore.class */
    public static class InOrderMemoryWindowStore extends InMemoryWindowStore {
        InOrderMemoryWindowStore(String str, long j, long j2, boolean z, String str2) {
            super(str, j, j2, z, str2);
        }

        public WindowStoreIterator<byte[]> backwardFetch(Bytes bytes, long j, long j2) {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }

        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes bytes, Bytes bytes2, long j, long j2) {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }

        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long j, long j2) {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }

        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest$InOrderMemoryWindowStoreSupplier.class */
    private static class InOrderMemoryWindowStoreSupplier extends InMemoryWindowBytesStoreSupplier {
        InOrderMemoryWindowStoreSupplier(String str, long j, long j2, boolean z) {
            super(str, j, j2, z);
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public WindowStore<Bytes, byte[]> m83get() {
            return new InOrderMemoryWindowStore(name(), retentionPeriod(), windowSize(), retainDuplicates(), metricsScope());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "{0}")
    public static Collection<Boolean[]> data() {
        return Arrays.asList(new Boolean[]{false}, new Boolean[]{true});
    }

    @Test
    public void testAggregateSmallInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 10L);
                createInputTopic.pipeInput("A", "2", 15L);
                createInputTopic.pipeInput("A", "3", 20L);
                createInputTopic.pipeInput("A", "4", 22L);
                createInputTopic.pipeInput("A", "5", 30L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                HashMap hashMap = new HashMap();
                Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                while (it.hasNext()) {
                    KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                    Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                    ValueAndTimestamp make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                    if (hashMap.putIfAbsent(valueOf, make) != null) {
                        hashMap.replace(valueOf, make);
                    }
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(0L, ValueAndTimestamp.make("0+1", 10L));
                hashMap2.put(5L, ValueAndTimestamp.make("0+1+2", 15L));
                hashMap2.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L));
                hashMap2.put(11L, ValueAndTimestamp.make("0+2+3", 20L));
                hashMap2.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L));
                hashMap2.put(16L, ValueAndTimestamp.make("0+3+4", 22L));
                hashMap2.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L));
                hashMap2.put(21L, ValueAndTimestamp.make("0+4+5", 30L));
                hashMap2.put(23L, ValueAndTimestamp.make("0+5", 30L));
                Assert.assertEquals(hashMap2, hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReduceSmallInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable reduce = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(50L))).reduce(MockReducer.STRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        reduce.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 10L);
                createInputTopic.pipeInput("A", "2", 14L);
                createInputTopic.pipeInput("A", "3", 15L);
                createInputTopic.pipeInput("A", "4", 22L);
                createInputTopic.pipeInput("A", "5", 26L);
                createInputTopic.pipeInput("A", "6", 30L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                HashMap hashMap = new HashMap();
                Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                while (it.hasNext()) {
                    KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                    Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                    ValueAndTimestamp make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                    if (hashMap.putIfAbsent(valueOf, make) != null) {
                        hashMap.replace(valueOf, make);
                    }
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(0L, ValueAndTimestamp.make("1", 10L));
                hashMap2.put(4L, ValueAndTimestamp.make("1+2", 14L));
                hashMap2.put(5L, ValueAndTimestamp.make("1+2+3", 15L));
                hashMap2.put(11L, ValueAndTimestamp.make("2+3", 15L));
                hashMap2.put(12L, ValueAndTimestamp.make("2+3+4", 22L));
                hashMap2.put(15L, ValueAndTimestamp.make("3+4", 22L));
                hashMap2.put(16L, ValueAndTimestamp.make("4+5", 26L));
                hashMap2.put(20L, ValueAndTimestamp.make("4+5+6", 30L));
                hashMap2.put(23L, ValueAndTimestamp.make("5+6", 30L));
                hashMap2.put(27L, ValueAndTimestamp.make("6", 30L));
                Assert.assertEquals(hashMap2, hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAggregateLargeInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic1", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 10L);
                createInputTopic.pipeInput("A", "2", 20L);
                createInputTopic.pipeInput("A", "3", 22L);
                createInputTopic.pipeInput("A", "4", 15L);
                createInputTopic.pipeInput("B", "1", 12L);
                createInputTopic.pipeInput("B", "2", 13L);
                createInputTopic.pipeInput("B", "3", 18L);
                createInputTopic.pipeInput("B", "4", 19L);
                createInputTopic.pipeInput("B", "5", 25L);
                createInputTopic.pipeInput("B", "6", 14L);
                createInputTopic.pipeInput("C", "1", 11L);
                createInputTopic.pipeInput("C", "2", 15L);
                createInputTopic.pipeInput("C", "3", 16L);
                createInputTopic.pipeInput("C", "4", 21L);
                createInputTopic.pipeInput("C", "5", 23L);
                createInputTopic.pipeInput("D", "4", 11L);
                createInputTopic.pipeInput("D", "2", 12L);
                createInputTopic.pipeInput("D", "3", 29L);
                createInputTopic.pipeInput("D", "5", 16L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Comparator thenComparing = Comparator.comparing(keyValueTimestamp -> {
                    return (String) ((Windowed) keyValueTimestamp.key()).key();
                }).thenComparing(keyValueTimestamp2 -> {
                    return Long.valueOf(((Windowed) keyValueTimestamp2.key()).window().start());
                });
                ArrayList processed = mockProcessorSupplier.theCapturedProcessor().processed();
                processed.sort(thenComparing);
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1", 10L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+4", 15L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+1+2", 20L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+1+2+4", 20L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(11L, 21L)), "0+2", 20L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(11L, 21L)), "0+2+4", 20L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(12L, 22L)), "0+2+3", 22L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(12L, 22L)), "0+2+3+4", 22L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(16L, 26L)), "0+2+3", 22L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(21L, 31L)), "0+3", 22L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(2L, 12L)), "0+1", 12L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(3L, 13L)), "0+1+2", 13L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(4L, 14L)), "0+1+2+6", 14L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(8L, 18L)), "0+1+2+3", 18L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(8L, 18L)), "0+1+2+3+6", 18L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(9L, 19L)), "0+1+2+3+4", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(9L, 19L)), "0+1+2+3+4+6", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(13L, 23L)), "0+2", 13L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(13L, 23L)), "0+2+3", 18L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(13L, 23L)), "0+2+3+4", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(13L, 23L)), "0+2+3+4+6", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(14L, 24L)), "0+3", 18L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(14L, 24L)), "0+3+4", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(14L, 24L)), "0+3+4+6", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(15L, 25L)), "0+3+4+5", 25L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(19L, 29L)), "0+4", 19L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(19L, 29L)), "0+4+5", 25L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(20L, 30L)), "0+5", 25L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(1L, 11L)), "0+1", 11L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(5L, 15L)), "0+1+2", 15L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(6L, 16L)), "0+1+2+3", 16L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(11L, 21L)), "0+1+2+3+4", 21L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(12L, 22L)), "0+2", 15L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(12L, 22L)), "0+2+3", 16L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(12L, 22L)), "0+2+3+4", 21L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(13L, 23L)), "0+2+3+4+5", 23L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(16L, 26L)), "0+3", 16L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(16L, 26L)), "0+3+4", 21L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(16L, 26L)), "0+3+4+5", 23L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(17L, 27L)), "0+4", 21L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(17L, 27L)), "0+4+5", 23L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(22L, 32L)), "0+5", 23L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(1L, 11L)), "0+4", 11L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(2L, 12L)), "0+4+2", 12L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(6L, 16L)), "0+4+2+5", 16L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(12L, 22L)), "0+2", 12L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(12L, 22L)), "0+2+5", 16L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(13L, 23L)), "0+5", 16L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(19L, 29L)), "0+3", 29L)), processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        InOrderMemoryWindowStoreSupplier inOrderMemoryWindowStoreSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder1", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse1", Duration.ofMillis(50000L), Duration.ofMillis(10L), false);
        InOrderMemoryWindowStoreSupplier inOrderMemoryWindowStoreSupplier2 = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder2", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse2", Duration.ofMillis(50000L), Duration.ofMillis(10L), false);
        KTable aggregate = streamsBuilder.stream("topic1", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(inOrderMemoryWindowStoreSupplier));
        KTable aggregate2 = streamsBuilder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(inOrderMemoryWindowStoreSupplier2));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        aggregate2.toStream().process(mockProcessorSupplier, new String[0]);
        aggregate.join(aggregate2, (str, str2) -> {
            return str + "%" + str2;
        }).toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic2", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 10L);
                createInputTopic.pipeInput("B", "2", 11L);
                createInputTopic.pipeInput("C", "3", 12L);
                List capturedProcessors = mockProcessorSupplier.capturedProcessors(3);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1", 10L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(1L, 11L)), "0+2", 11L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(2L, 12L)), "0+3", 12L));
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp[0]);
                ((MockProcessor) capturedProcessors.get(2)).checkAndClearProcessResult(new KeyValueTimestamp[0]);
                createInputTopic.pipeInput("A", "1", 15L);
                createInputTopic.pipeInput("B", "2", 16L);
                createInputTopic.pipeInput("C", "3", 19L);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(11L, 21L)), "0+1", 15L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1", 15L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(12L, 22L)), "0+2", 16L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(6L, 16L)), "0+2+2", 16L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(13L, 23L)), "0+3", 19L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(9L, 19L)), "0+3+3", 19L));
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp[0]);
                ((MockProcessor) capturedProcessors.get(2)).checkAndClearProcessResult(new KeyValueTimestamp[0]);
                createInputTopic2.pipeInput("A", "a", 10L);
                createInputTopic2.pipeInput("B", "b", 30L);
                createInputTopic2.pipeInput("C", "c", 12L);
                createInputTopic2.pipeInput("C", "c", 35L);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp[0]);
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+a", 10L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(20L, 30L)), "0+b", 30L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(2L, 12L)), "0+c", 12L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(25L, 35L)), "0+c", 35L));
                ((MockProcessor) capturedProcessors.get(2)).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1%0+a", 10L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(2L, 12L)), "0+3%0+c", 12L));
                createInputTopic2.pipeInput("A", "a", 15L);
                createInputTopic2.pipeInput("B", "b", 16L);
                createInputTopic2.pipeInput("C", "c", 17L);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp[0]);
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(11L, 21L)), "0+a", 15L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+a+a", 15L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(6L, 16L)), "0+b", 16L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(13L, 23L)), "0+c", 17L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(7L, 17L)), "0+c+c", 17L));
                ((MockProcessor) capturedProcessors.get(2)).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(11L, 21L)), "0+1%0+a", 15L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1%0+a+a", 15L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(6L, 16L)), "0+2+2%0+b", 16L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(13L, 23L)), "0+3%0+c", 19L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testEarlyRecordsSmallInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(50L), Duration.ofMillis(200L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("topic-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 0L);
                createInputTopic.pipeInput("A", "2", 5L);
                createInputTopic.pipeInput("A", "3", 6L);
                createInputTopic.pipeInput("A", "4", 3L);
                createInputTopic.pipeInput("A", "5", 13L);
                createInputTopic.pipeInput("A", "6", 10L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                HashMap hashMap = new HashMap();
                Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                while (it.hasNext()) {
                    KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                    Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                    ValueAndTimestamp make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                    if (hashMap.putIfAbsent(valueOf, make) != null) {
                        hashMap.replace(valueOf, make);
                    }
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
                hashMap2.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
                hashMap2.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
                hashMap2.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
                hashMap2.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
                hashMap2.put(11L, ValueAndTimestamp.make("0+5", 13L));
                Assert.assertEquals(hashMap2, hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEarlyRecordsRepeatedInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(5L), Duration.ofMillis(20L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("topic-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 0L);
                createInputTopic.pipeInput("A", "2", 2L);
                createInputTopic.pipeInput("A", "3", 4L);
                createInputTopic.pipeInput("A", "4", 0L);
                createInputTopic.pipeInput("A", "5", 2L);
                createInputTopic.pipeInput("A", "6", 2L);
                createInputTopic.pipeInput("A", "7", 0L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                HashMap hashMap = new HashMap();
                Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                while (it.hasNext()) {
                    KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                    Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                    ValueAndTimestamp make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                    if (hashMap.putIfAbsent(valueOf, make) != null) {
                        hashMap.replace(valueOf, make);
                    }
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6+7", 4L));
                hashMap2.put(1L, ValueAndTimestamp.make("0+2+3+5+6", 4L));
                hashMap2.put(3L, ValueAndTimestamp.make("0+3", 4L));
                Assert.assertEquals(hashMap2, hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEarlyRecordsLargeInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("E", "1", 0L);
                createInputTopic.pipeInput("E", "3", 5L);
                createInputTopic.pipeInput("E", "4", 6L);
                createInputTopic.pipeInput("E", "2", 3L);
                createInputTopic.pipeInput("E", "6", 13L);
                createInputTopic.pipeInput("E", "5", 10L);
                createInputTopic.pipeInput("E", "7", 4L);
                createInputTopic.pipeInput("E", "8", 2L);
                createInputTopic.pipeInput("E", "9", 15L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Comparator thenComparing = Comparator.comparing(keyValueTimestamp -> {
                    return (String) ((Windowed) keyValueTimestamp.key()).key();
                }).thenComparing(keyValueTimestamp2 -> {
                    return Long.valueOf(((Windowed) keyValueTimestamp2.key()).window().start());
                });
                ArrayList processed = mockProcessorSupplier.theCapturedProcessor().processed();
                processed.sort(thenComparing);
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3", 5L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4+2", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4+2+5", 10L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4+2+5+7", 10L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4+2+5+7+8", 10L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3", 5L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4+2", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4+2+5", 10L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4+2+5+7", 10L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4+2+5+7+8", 10L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(3L, 13L)), "0+3+4+2+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(3L, 13L)), "0+3+4+2+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(3L, 13L)), "0+3+4+2+6+5+7", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4+6+5+7", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(5L, 15L)), "0+3+4+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(5L, 15L)), "0+3+4+6+5+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4+6+5+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(7L, 17L)), "0+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(7L, 17L)), "0+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(7L, 17L)), "0+6+5+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(11L, 21L)), "0+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(11L, 21L)), "0+6+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(14L, 24L)), "0+9", 15L)), processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEarlyNoGracePeriodSmallInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("topic-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 0L);
                createInputTopic.pipeInput("A", "2", 5L);
                createInputTopic.pipeInput("A", "3", 6L);
                createInputTopic.pipeInput("A", "4", 3L);
                createInputTopic.pipeInput("A", "5", 13L);
                createInputTopic.pipeInput("A", "6", 10L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                HashMap hashMap = new HashMap();
                Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                while (it.hasNext()) {
                    KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                    Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                    ValueAndTimestamp make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                    if (hashMap.putIfAbsent(valueOf, make) != null) {
                        hashMap.replace(valueOf, make);
                    }
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
                hashMap2.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
                hashMap2.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
                hashMap2.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
                hashMap2.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
                hashMap2.put(11L, ValueAndTimestamp.make("0+5", 13L));
                Assert.assertEquals(hashMap2, hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testNoGracePeriodSmallInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("topic-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("A", "1", 100L);
                createInputTopic.pipeInput("A", "2", 105L);
                createInputTopic.pipeInput("A", "3", 106L);
                createInputTopic.pipeInput("A", "4", 103L);
                createInputTopic.pipeInput("A", "5", 113L);
                createInputTopic.pipeInput("A", "6", 110L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                HashMap hashMap = new HashMap();
                Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                while (it.hasNext()) {
                    KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                    Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                    ValueAndTimestamp make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                    if (hashMap.putIfAbsent(valueOf, make) != null) {
                        hashMap.replace(valueOf, make);
                    }
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(50L, ValueAndTimestamp.make("0+1", 100L));
                hashMap2.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
                hashMap2.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
                hashMap2.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
                hashMap2.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
                hashMap2.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
                hashMap2.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
                hashMap2.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
                hashMap2.put(111L, ValueAndTimestamp.make("0+5", 113L));
                Assert.assertEquals(hashMap2, hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEarlyNoGracePeriodLargeInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("E", "1", 0L);
                createInputTopic.pipeInput("E", "3", 5L);
                createInputTopic.pipeInput("E", "4", 6L);
                createInputTopic.pipeInput("E", "2", 3L);
                createInputTopic.pipeInput("E", "6", 13L);
                createInputTopic.pipeInput("E", "5", 10L);
                createInputTopic.pipeInput("E", "7", 4L);
                createInputTopic.pipeInput("E", "8", 2L);
                createInputTopic.pipeInput("E", "9", 15L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Comparator thenComparing = Comparator.comparing(keyValueTimestamp -> {
                    return (String) ((Windowed) keyValueTimestamp.key()).key();
                }).thenComparing(keyValueTimestamp2 -> {
                    return Long.valueOf(((Windowed) keyValueTimestamp2.key()).window().start());
                });
                ArrayList processed = mockProcessorSupplier.theCapturedProcessor().processed();
                processed.sort(thenComparing);
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3", 5L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(0L, 10L)), "0+1+3+4+2", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3", 5L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(1L, 11L)), "0+3+4+2", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(3L, 13L)), "0+3+4+2+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(3L, 13L)), "0+3+4+2+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(3L, 13L)), "0+3+4+2+6+5+7", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(4L, 14L)), "0+3+4+6+5+7", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(5L, 15L)), "0+3+4+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(5L, 15L)), "0+3+4+6+5+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4", 6L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(6L, 16L)), "0+4+6+5+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(7L, 17L)), "0+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(7L, 17L)), "0+6+5", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(7L, 17L)), "0+6+5+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(11L, 21L)), "0+6", 13L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(11L, 21L)), "0+6+9", 15L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(14L, 24L)), "0+9", 15L)), processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testNoGracePeriodLargeInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("E", "1", 100L);
                createInputTopic.pipeInput("E", "3", 105L);
                createInputTopic.pipeInput("E", "4", 106L);
                createInputTopic.pipeInput("E", "2", 103L);
                createInputTopic.pipeInput("E", "6", 113L);
                createInputTopic.pipeInput("E", "5", 110L);
                createInputTopic.pipeInput("E", "7", 104L);
                createInputTopic.pipeInput("E", "8", 102L);
                createInputTopic.pipeInput("E", "9", 115L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Comparator thenComparing = Comparator.comparing(keyValueTimestamp -> {
                    return (String) ((Windowed) keyValueTimestamp.key()).key();
                }).thenComparing(keyValueTimestamp2 -> {
                    return Long.valueOf(((Windowed) keyValueTimestamp2.key()).window().start());
                });
                ArrayList processed = mockProcessorSupplier.theCapturedProcessor().processed();
                processed.sort(thenComparing);
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("E", new TimeWindow(90L, 100L)), "0+1", 100L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(95L, 105L)), "0+1+3", 105L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(96L, 106L)), "0+1+3+4", 106L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(96L, 106L)), "0+1+3+4+2", 106L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(101L, 111L)), "0+3", 105L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(101L, 111L)), "0+3+4", 106L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(101L, 111L)), "0+3+4+2", 106L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(103L, 113L)), "0+3+4+2+6", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(103L, 113L)), "0+3+4+2+6+5", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(103L, 113L)), "0+3+4+2+6+5+7", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(104L, 114L)), "0+3+4", 106L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(104L, 114L)), "0+3+4+6", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(104L, 114L)), "0+3+4+6+5", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(104L, 114L)), "0+3+4+6+5+7", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(105L, 115L)), "0+3+4+6+5", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(105L, 115L)), "0+3+4+6+5+9", 115L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(106L, 116L)), "0+4", 106L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(106L, 116L)), "0+4+6", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(106L, 116L)), "0+4+6+5", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(106L, 116L)), "0+4+6+5+9", 115L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(107L, 117L)), "0+6", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(107L, 117L)), "0+6+5", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(107L, 117L)), "0+6+5+9", 115L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(111L, 121L)), "0+6", 113L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(111L, 121L)), "0+6+9", 115L), new KeyValueTimestamp(new Windowed("E", new TimeWindow(114L, 124L)), "0+9", 115L)), processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKey() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.as("topic1-Canonicalized").withValueSerde(Serdes.String()));
        this.props.setProperty("built.in.metrics.version", "latest");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer()).pipeInput((Object) null, "1");
                    MatcherAssert.assertThat(createAndRegister.getEvents().stream().filter(event -> {
                        return event.getLevel().equals("WARN");
                    }).map((v0) -> {
                        return v0.getMessage();
                    }).collect(Collectors.toList()), CoreMatchers.hasItem("Skipping record due to null key or value. topic=[topic] partition=[0] offset=[0]"));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(90L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000L), Duration.ofMillis(10L), false))).toStream().to("output");
        this.props.setProperty("built.in.metrics.version", "latest");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
                    createInputTopic.pipeInput("k", "100", 200L);
                    createInputTopic.pipeInput("k", "0", 100L);
                    createInputTopic.pipeInput("k", "1", 101L);
                    createInputTopic.pipeInput("k", "2", 102L);
                    createInputTopic.pipeInput("k", "3", 103L);
                    createInputTopic.pipeInput("k", "4", 104L);
                    createInputTopic.pipeInput("k", "5", 105L);
                    createInputTopic.pipeInput("k", "6", 15L);
                    assertLatenessMetrics(topologyTestDriver, CoreMatchers.is(Double.valueOf(7.0d)), CoreMatchers.is(Double.valueOf(185.0d)), CoreMatchers.is(Double.valueOf(96.25d)));
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItems(new String[]{"Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]"}));
                    TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new TimeWindowedDeserializer(new StringDeserializer(), 10L), new StringDeserializer());
                    MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord(new Windowed("k", new TimeWindow(190L, 200L)), "0+100", (Headers) null, 200L)));
                    Assert.assertTrue(createOutputTopic.isEmpty());
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testAggregateRandomInput() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("topic1", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(10000L))).aggregate(() -> {
            return "";
        }, (str, str2, str3) -> {
            char[] charArray = (str3 + str2).toCharArray();
            Arrays.sort(charArray);
            return String.valueOf(charArray);
        }, Materialized.as(this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000L), Duration.ofMillis(10L), false)));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        aggregate.toStream().process(mockProcessorSupplier, new String[0]);
        long nextLong = new Random().nextLong();
        Random random = new Random(nextLong);
        try {
            List asList = Arrays.asList(ValueAndTimestamp.make("A", 10L), ValueAndTimestamp.make("B", 15L), ValueAndTimestamp.make("C", 16L), ValueAndTimestamp.make("D", 18L), ValueAndTimestamp.make("E", 30L), ValueAndTimestamp.make("F", 40L), ValueAndTimestamp.make("G", 55L), ValueAndTimestamp.make("H", 56L), ValueAndTimestamp.make("I", 58L), ValueAndTimestamp.make("J", 58L), ValueAndTimestamp.make("K", 62L), ValueAndTimestamp.make("L", 63L), ValueAndTimestamp.make("M", 63L), ValueAndTimestamp.make("N", 63L), ValueAndTimestamp.make("O", 76L), ValueAndTimestamp.make("P", 77L), ValueAndTimestamp.make("Q", 80L), ValueAndTimestamp.make("R", 2L), ValueAndTimestamp.make("S", 3L), ValueAndTimestamp.make("T", 5L), ValueAndTimestamp.make("U", 8L));
            Collections.shuffle(asList, random);
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th = null;
            try {
                try {
                    TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new StringSerializer());
                    for (int i = 0; i < asList.size(); i++) {
                        createInputTopic.pipeInput("A", ((ValueAndTimestamp) asList.get(i)).value(), ((ValueAndTimestamp) asList.get(i)).timestamp());
                    }
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    HashMap hashMap = new HashMap();
                    Iterator it = mockProcessorSupplier.theCapturedProcessor().processed().iterator();
                    while (it.hasNext()) {
                        KeyValueTimestamp keyValueTimestamp = (KeyValueTimestamp) it.next();
                        Long valueOf = Long.valueOf(((Windowed) keyValueTimestamp.key()).window().start());
                        ValueAndTimestamp<String> make = ValueAndTimestamp.make(keyValueTimestamp.value(), keyValueTimestamp.timestamp());
                        if (hashMap.putIfAbsent(valueOf, make) != null) {
                            hashMap.replace(valueOf, make);
                        }
                    }
                    verifyRandomTestResults(hashMap);
                } finally {
                }
            } finally {
            }
        } catch (AssertionError e) {
            throw new AssertionError("Assertion failed in randomized test. Reproduce with seed: " + nextLong + ".", e);
        } catch (Throwable th3) {
            throw new AssertionError("Exception in randomized scenario. Reproduce with seed: " + nextLong + ".", th3);
        }
    }

    private void verifyRandomTestResults(Map<Long, ValueAndTimestamp<String>> map) {
        HashMap hashMap = new HashMap();
        hashMap.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
        hashMap.put(3L, ValueAndTimestamp.make("ASTU", 10L));
        hashMap.put(4L, ValueAndTimestamp.make("ATU", 10L));
        hashMap.put(5L, ValueAndTimestamp.make("ABTU", 15L));
        hashMap.put(6L, ValueAndTimestamp.make("ABCU", 16L));
        hashMap.put(8L, ValueAndTimestamp.make("ABCDU", 18L));
        hashMap.put(9L, ValueAndTimestamp.make("ABCD", 18L));
        hashMap.put(11L, ValueAndTimestamp.make("BCD", 18L));
        hashMap.put(16L, ValueAndTimestamp.make("CD", 18L));
        hashMap.put(17L, ValueAndTimestamp.make("D", 18L));
        hashMap.put(20L, ValueAndTimestamp.make("E", 30L));
        hashMap.put(30L, ValueAndTimestamp.make("EF", 40L));
        hashMap.put(31L, ValueAndTimestamp.make("F", 40L));
        hashMap.put(45L, ValueAndTimestamp.make("G", 55L));
        hashMap.put(46L, ValueAndTimestamp.make("GH", 56L));
        hashMap.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
        hashMap.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
        hashMap.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
        hashMap.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
        hashMap.put(57L, ValueAndTimestamp.make("IJKLMN", 63L));
        hashMap.put(59L, ValueAndTimestamp.make("KLMN", 63L));
        hashMap.put(63L, ValueAndTimestamp.make("LMN", 63L));
        hashMap.put(66L, ValueAndTimestamp.make("O", 76L));
        hashMap.put(67L, ValueAndTimestamp.make("OP", 77L));
        hashMap.put(70L, ValueAndTimestamp.make("OPQ", 80L));
        hashMap.put(77L, ValueAndTimestamp.make("PQ", 80L));
        hashMap.put(78L, ValueAndTimestamp.make("Q", 80L));
        Assert.assertEquals(hashMap, map);
    }

    private void assertLatenessMetrics(TopologyTestDriver topologyTestDriver, Matcher<Object> matcher, Matcher<Object> matcher2, Matcher<Object> matcher3) {
        MetricName metricName = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MetricName metricName2 = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MetricName metricName3 = new MetricName("record-lateness-max", "stream-task-metrics", "The observed maximum lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MetricName metricName4 = new MetricName("record-lateness-avg", "stream-task-metrics", "The observed average lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName)).metricValue(), matcher);
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName2)).metricValue(), CoreMatchers.not(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName3)).metricValue(), matcher2);
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName4)).metricValue(), matcher3);
    }
}
