/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public abstract class TopologyTestDriverTest {
    private static final String SOURCE_TOPIC_1 = "source-topic-1";
    private static final String SOURCE_TOPIC_2 = "source-topic-2";
    private static final String SINK_TOPIC_1 = "sink-topic-1";
    private static final String SINK_TOPIC_2 = "sink-topic-2";
    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
    private final byte[] key1 = new byte[0];
    private final byte[] value1 = new byte[0];
    private final long timestamp1 = 42L;
    private final TestRecord<byte[], byte[]> testRecord1 = new TestRecord((Object)this.key1, (Object)this.value1, this.headers, Long.valueOf(42L));
    private final byte[] key2 = new byte[0];
    private final byte[] value2 = new byte[0];
    private final long timestamp2 = 43L;
    private TopologyTestDriver testDriver;
    private final Properties config;
    private KeyValueStore<String, Long> store;
    private final StringDeserializer stringDeserializer = new StringDeserializer();
    private final LongDeserializer longDeserializer = new LongDeserializer();
    private final List<MockProcessor> mockProcessors = new ArrayList<MockProcessor>();
    final ProcessorSupplier<byte[], byte[], Void, Void> voidProcessorSupplier = () -> new Processor<byte[], byte[], Void, Void>(){

        public void process(Record<byte[], byte[]> record) {
        }
    };

    TopologyTestDriverTest(Map<String, String> overrides) {
        this.config = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-TopologyTestDriver"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getAbsolutePath())}));
        this.config.put("default.key.serde", Serdes.ByteArraySerde.class);
        this.config.put("default.value.serde", Serdes.ByteArraySerde.class);
        this.config.putAll(overrides);
    }

    @AfterEach
    public void tearDown() {
        if (this.testDriver != null) {
            this.testDriver.close();
        }
    }

    private Topology setupSourceSinkTopology() {
        Topology topology = new Topology();
        String sourceName = "source";
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        return topology;
    }

    private Topology setupTopologyWithTwoSubtopologies() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        topology.addSource("source-1", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink-1", SINK_TOPIC_1, new String[]{"source-1"});
        topology.addSource("source-2", new String[]{SINK_TOPIC_1});
        topology.addSink("sink-2", SINK_TOPIC_2, new String[]{"source-2"});
        return topology;
    }

    private Topology setupSingleProcessorTopology() {
        return this.setupSingleProcessorTopology(-1L, null, null);
    }

    private Topology setupSingleProcessorTopology(long punctuationIntervalMs, PunctuationType punctuationType, Punctuator callback) {
        Set<Object> punctuations = punctuationIntervalMs > 0L && punctuationType != null && callback != null ? Collections.singleton(new Punctuation(punctuationIntervalMs, punctuationType, callback)) : Collections.emptySet();
        Topology topology = new Topology();
        String sourceName = "source";
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(punctuations), new String[]{"source"});
        return topology;
    }

    private Topology setupMultipleSourceTopology(String ... sourceTopicNames) {
        Topology topology = new Topology();
        String[] processorNames = new String[sourceTopicNames.length];
        int i = 0;
        for (String sourceTopicName : sourceTopicNames) {
            String sourceName = sourceTopicName + "-source";
            String processorName = sourceTopicName + "-processor";
            topology.addSource(sourceName, new String[]{sourceTopicName});
            processorNames[i++] = processorName;
            topology.addProcessor(processorName, (ProcessorSupplier)new MockProcessorSupplier(), new String[]{sourceName});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
        return topology;
    }

    private Topology setupGlobalStoreTopology(String ... sourceTopicNames) {
        if (sourceTopicNames.length == 0) {
            throw new IllegalArgumentException("sourceTopicNames cannot be empty");
        }
        Topology topology = new Topology();
        for (final String sourceTopicName : sourceTopicNames) {
            topology.addGlobalStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)(sourceTopicName + "-globalStore")), null, null).withLoggingDisabled(), sourceTopicName, null, null, sourceTopicName, sourceTopicName + "-processor", () -> new Processor<Object, Object, Void, Void>(){
                KeyValueStore store;

                public void init(ProcessorContext<Void, Void> context) {
                    this.store = (KeyValueStore)context.getStateStore(sourceTopicName + "-globalStore");
                }

                public void process(Record<Object, Object> record) {
                    this.store.put(record.key(), record.value());
                }
            });
        }
        return topology;
    }

    private Topology setupTopologyWithInternalTopic(String firstTableName, String secondTableName, String joinName) {
        StreamsBuilder builder = new StreamsBuilder();
        KTable t1 = builder.stream(SOURCE_TOPIC_1).selectKey((k, v) -> v).groupByKey().count(Materialized.as((String)firstTableName));
        builder.table(SOURCE_TOPIC_2, Materialized.as((String)secondTableName)).join(t1, v -> v, (v1, v2) -> v2, TableJoined.as((String)joinName));
        return builder.build(this.config);
    }

    @Test
    public void shouldNotRequireParameters() {
        new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
    }

    @Test
    public void shouldInitProcessor() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        Assertions.assertTrue((boolean)this.mockProcessors.get(0).initialized);
    }

    @Test
    public void shouldCloseProcessor() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        this.testDriver.close();
        Assertions.assertTrue((boolean)this.mockProcessors.get(0).closed);
        this.testDriver = null;
    }

    @Test
    public void shouldThrowForUnknownTopic() {
        this.testDriver = new TopologyTestDriver(new Topology());
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.pipeRecord("unknownTopic", new TestRecord((Object)null), (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), Instant.now()));
    }

    @Test
    public void shouldThrowForMissingTime() {
        this.testDriver = new TopologyTestDriver(new Topology());
        Assertions.assertThrows(IllegalStateException.class, () -> this.testDriver.pipeRecord(SINK_TOPIC_1, new TestRecord((Object)"value"), (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), null));
    }

    @Test
    public void shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting() {
        this.testDriver = new TopologyTestDriver(this.setupSourceSinkTopology(), this.config);
        TestOutputTopic outputTopic = new TestOutputTopic(this.testDriver, "unused-topic", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
        Assertions.assertTrue((boolean)outputTopic.isEmpty());
        Assertions.assertThrows(NoSuchElementException.class, () -> ((TestOutputTopic)outputTopic).readRecord());
    }

    @Test
    public void shouldCaptureSinkTopicNamesIfWrittenInto() {
        this.testDriver = new TopologyTestDriver(this.setupSourceSinkTopology(), this.config);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.is(Collections.emptySet()));
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.hasItem((Object)SINK_TOPIC_1));
    }

    @Test
    public void shouldCaptureInternalTopicNamesIfWrittenInto() {
        this.testDriver = new TopologyTestDriver(this.setupTopologyWithInternalTopic("table1", "table2", "join"), this.config);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.is(Collections.emptySet()));
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{this.config.getProperty("application.id") + "-table1-repartition", this.config.getProperty("application.id") + "-table1-changelog"})));
        this.pipeRecord(SOURCE_TOPIC_2, this.testRecord1);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{this.config.getProperty("application.id") + "-table1-repartition", this.config.getProperty("application.id") + "-table1-changelog", this.config.getProperty("application.id") + "-table2-changelog", this.config.getProperty("application.id") + "-join-subscription-registration-topic", this.config.getProperty("application.id") + "-join-subscription-store-changelog", this.config.getProperty("application.id") + "-join-subscription-response-topic"})));
    }

    @Test
    public void shouldCaptureGlobalTopicNameIfWrittenInto() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable(SOURCE_TOPIC_1, Materialized.as((String)"globalTable"));
        builder.stream(SOURCE_TOPIC_2).to(SOURCE_TOPIC_1);
        this.testDriver = new TopologyTestDriver(builder.build(), this.config);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.is(Collections.emptySet()));
        this.pipeRecord(SOURCE_TOPIC_2, this.testRecord1);
        MatcherAssert.assertThat((Object)this.testDriver.producedTopicNames(), (Matcher)CoreMatchers.equalTo(Collections.singleton(SOURCE_TOPIC_1)));
    }

    @Test
    public void shouldProcessRecordForTopic() {
        this.testDriver = new TopologyTestDriver(this.setupSourceSinkTopology(), this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord outputRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assertions.assertArrayEquals((byte[])this.key1, (byte[])((byte[])outputRecord.key()));
        Assertions.assertArrayEquals((byte[])this.value1, (byte[])((byte[])outputRecord.value()));
        Assertions.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
    }

    @Test
    public void shouldSetRecordMetadata() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        List processedRecords = this.mockProcessors.get(0).processedRecords;
        Assertions.assertEquals((int)1, (int)processedRecords.size());
        TTDTestRecord record = (TTDTestRecord)processedRecords.get(0);
        TTDTestRecord expectedResult = new TTDTestRecord(SOURCE_TOPIC_1, this.testRecord1, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    private void pipeRecord(String topic, TestRecord<byte[], byte[]> record) {
        this.testDriver.pipeRecord(topic, record, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), null);
    }

    @Test
    public void shouldSendRecordViaCorrectSourceTopic() {
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        TestInputTopic inputTopic1 = this.testDriver.createInputTopic(SOURCE_TOPIC_1, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        TestInputTopic inputTopic2 = this.testDriver.createInputTopic(SOURCE_TOPIC_2, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        inputTopic1.pipeInput(new TestRecord((Object)this.key1, (Object)this.value1, this.headers, Long.valueOf(42L)));
        Assertions.assertEquals((int)1, (int)processedRecords1.size());
        Assertions.assertEquals((int)0, (int)processedRecords2.size());
        TTDTestRecord record = (TTDTestRecord)processedRecords1.get(0);
        TTDTestRecord expectedResult = new TTDTestRecord(this.key1, this.value1, this.headers, 42L, 0L, SOURCE_TOPIC_1);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
        inputTopic2.pipeInput(new TestRecord((Object)this.key2, (Object)this.value2, Instant.ofEpochMilli(43L)));
        Assertions.assertEquals((int)1, (int)processedRecords1.size());
        Assertions.assertEquals((int)1, (int)processedRecords2.size());
        record = (TTDTestRecord)processedRecords2.get(0);
        expectedResult = new TTDTestRecord(this.key2, this.value2, (Headers)new RecordHeaders((Iterable)null), 43L, 0L, SOURCE_TOPIC_2);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldUseSourceSpecificDeserializers() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        String processor = "processor";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topology.addSink("sink", SINK_TOPIC_1, (topic, data) -> {
            if (data instanceof Long) {
                return Serdes.Long().serializer().serialize(topic, (Object)((Long)data));
            }
            return Serdes.Integer().serializer().serialize(topic, (Object)((Integer)data));
        }, (topic, data) -> {
            if (data instanceof String) {
                return Serdes.String().serializer().serialize(topic, (Object)((String)data));
            }
            return Serdes.Double().serializer().serialize(topic, (Object)((Double)data));
        }, new String[]{"processor"});
        this.testDriver = new TopologyTestDriver(topology);
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        TestRecord consumerRecord1 = new TestRecord((Object)source1Key, (Object)"anyString");
        TestRecord consumerRecord2 = new TestRecord((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeRecord(SOURCE_TOPIC_1, consumerRecord1, Serdes.Long().serializer(), Serdes.String().serializer(), Instant.now());
        TestRecord result1 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat((Object)result1.getKey(), (Matcher)CoreMatchers.equalTo((Object)source1Key));
        MatcherAssert.assertThat((Object)result1.getValue(), (Matcher)CoreMatchers.equalTo((Object)"anyString"));
        this.testDriver.pipeRecord(SOURCE_TOPIC_2, consumerRecord2, Serdes.Integer().serializer(), Serdes.Double().serializer(), Instant.now());
        TestRecord result2 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat((Object)result2.getKey(), (Matcher)CoreMatchers.equalTo((Object)source2Key));
        MatcherAssert.assertThat((Object)result2.getValue(), (Matcher)CoreMatchers.equalTo((Object)source2Value));
    }

    @Test
    public void shouldPassRecordHeadersIntoSerializersAndDeserializers() {
        this.testDriver = new TopologyTestDriver(this.setupSourceSinkTopology(), this.config);
        final AtomicBoolean passedHeadersToKeySerializer = new AtomicBoolean(false);
        final AtomicBoolean passedHeadersToValueSerializer = new AtomicBoolean(false);
        final AtomicBoolean passedHeadersToKeyDeserializer = new AtomicBoolean(false);
        final AtomicBoolean passedHeadersToValueDeserializer = new AtomicBoolean(false);
        ByteArraySerializer keySerializer = new ByteArraySerializer(){

            public byte[] serialize(String topic, Headers headers, byte[] data) {
                passedHeadersToKeySerializer.set(true);
                return this.serialize(topic, data);
            }
        };
        ByteArraySerializer valueSerializer = new ByteArraySerializer(){

            public byte[] serialize(String topic, Headers headers, byte[] data) {
                passedHeadersToValueSerializer.set(true);
                return this.serialize(topic, data);
            }
        };
        ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer(){

            public byte[] deserialize(String topic, Headers headers, byte[] data) {
                passedHeadersToKeyDeserializer.set(true);
                return this.deserialize(topic, data);
            }
        };
        ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer(){

            public byte[] deserialize(String topic, Headers headers, byte[] data) {
                passedHeadersToValueDeserializer.set(true);
                return this.deserialize(topic, data);
            }
        };
        TestInputTopic inputTopic = this.testDriver.createInputTopic(SOURCE_TOPIC_1, (Serializer)keySerializer, (Serializer)valueSerializer);
        TestOutputTopic outputTopic = this.testDriver.createOutputTopic(SINK_TOPIC_1, (Deserializer)keyDeserializer, (Deserializer)valueDeserializer);
        inputTopic.pipeInput(this.testRecord1);
        outputTopic.readRecord();
        MatcherAssert.assertThat((Object)passedHeadersToKeySerializer.get(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)passedHeadersToValueSerializer.get(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)passedHeadersToKeyDeserializer.get(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)passedHeadersToValueDeserializer.get(), (Matcher)CoreMatchers.equalTo((Object)true));
    }

    @Test
    public void shouldUseSinkSpecificSerializers() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), new String[]{"source-1"});
        topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), new String[]{"source-2"});
        this.testDriver = new TopologyTestDriver(topology);
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        TestRecord consumerRecord1 = new TestRecord((Object)source1Key, (Object)"anyString");
        TestRecord consumerRecord2 = new TestRecord((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeRecord(SOURCE_TOPIC_1, consumerRecord1, Serdes.Long().serializer(), Serdes.String().serializer(), Instant.now());
        TestRecord result1 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat((Object)result1.getKey(), (Matcher)CoreMatchers.equalTo((Object)source1Key));
        MatcherAssert.assertThat((Object)result1.getValue(), (Matcher)CoreMatchers.equalTo((Object)"anyString"));
        this.testDriver.pipeRecord(SOURCE_TOPIC_2, consumerRecord2, Serdes.Integer().serializer(), Serdes.Double().serializer(), Instant.now());
        TestRecord result2 = this.testDriver.readRecord(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat((Object)result2.getKey(), (Matcher)CoreMatchers.equalTo((Object)source2Key));
        MatcherAssert.assertThat((Object)result2.getValue(), (Matcher)CoreMatchers.equalTo((Object)source2Value));
    }

    @Test
    public void shouldForwardRecordsFromSubtopologyToSubtopology() {
        this.testDriver = new TopologyTestDriver(this.setupTopologyWithTwoSubtopologies(), this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord outputRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assertions.assertArrayEquals((byte[])this.key1, (byte[])((byte[])outputRecord.key()));
        Assertions.assertArrayEquals((byte[])this.value1, (byte[])((byte[])outputRecord.value()));
        Assertions.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
        outputRecord = this.testDriver.readRecord(SINK_TOPIC_2);
        Assertions.assertArrayEquals((byte[])this.key1, (byte[])((byte[])outputRecord.key()));
        Assertions.assertArrayEquals((byte[])this.value1, (byte[])((byte[])outputRecord.value()));
        Assertions.assertEquals((Object)SINK_TOPIC_2, (Object)outputRecord.topic());
    }

    @Test
    public void shouldPopulateGlobalStore() {
        this.testDriver = new TopologyTestDriver(this.setupGlobalStoreTopology(SOURCE_TOPIC_1), this.config);
        KeyValueStore globalStore = this.testDriver.getKeyValueStore("source-topic-1-globalStore");
        Assertions.assertNotNull((Object)globalStore);
        Assertions.assertNotNull(this.testDriver.getAllStateStores().get("source-topic-1-globalStore"));
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        MatcherAssert.assertThat((Object)globalStore.get(this.testRecord1.key()), (Matcher)CoreMatchers.is((Object)this.testRecord1.value()));
    }

    @Test
    public void shouldPunctuateOnStreamsTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator), this.config);
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        expectedPunctuations.add(42L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(42L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(42L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(51L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(51L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(52L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(61L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(61L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(65L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(71L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(71L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(72L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(95L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(95L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(101L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(101L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(102L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldPunctuateOnWallClockTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator), this.config, Instant.ofEpochMilli(0L));
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        this.testDriver.advanceWallClockTime(Duration.ofMillis(5L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(14L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(9L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.advanceWallClockTime(Duration.ofMillis(1L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(35L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(20L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(40L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(5L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldReturnAllStores() {
        Topology topology = this.setupSourceSinkTopology();
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(), new String[]{"source"});
        topology.addStateStore((StoreBuilder)new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"store"), Serdes.ByteArray(), Serdes.ByteArray(), Time.SYSTEM), new String[]{"processor"});
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), Time.SYSTEM).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", this.voidProcessorSupplier);
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet<String> expectedStoreNames = new HashSet<String>();
        expectedStoreNames.add("store");
        expectedStoreNames.add("globalStore");
        Map allStores = this.testDriver.getAllStateStores();
        MatcherAssert.assertThat(allStores.keySet(), (Matcher)CoreMatchers.equalTo(expectedStoreNames));
        for (StateStore store : allStores.values()) {
            Assertions.assertNotNull((Object)store);
        }
    }

    @Test
    public void shouldReturnCorrectPersistentStoreTypeOnly() {
        this.shouldReturnCorrectStoreTypeOnly(true);
    }

    @Test
    public void shouldReturnCorrectInMemoryStoreTypeOnly() {
        this.shouldReturnCorrectStoreTypeOnly(false);
    }

    private void shouldReturnCorrectStoreTypeOnly(boolean persistent) {
        String keyValueStoreName = "keyValueStore";
        String timestampedKeyValueStoreName = "keyValueTimestampStore";
        String versionedKeyValueStoreName = "keyValueVersionedStore";
        String windowStoreName = "windowStore";
        String timestampedWindowStoreName = "windowTimestampStore";
        String sessionStoreName = "sessionStore";
        String globalKeyValueStoreName = "globalKeyValueStore";
        String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
        String globalVersionedKeyValueStoreName = "globalKeyValueVersionedStore";
        Topology topology = this.setupSingleProcessorTopology();
        this.addStoresToTopology(topology, persistent, "keyValueStore", "keyValueTimestampStore", "keyValueVersionedStore", "windowStore", "windowTimestampStore", "sessionStore", "globalKeyValueStore", "globalKeyValueTimestampStore", "globalKeyValueVersionedStore");
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Assertions.assertNotNull((Object)this.testDriver.getKeyValueStore("keyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("keyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("keyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getWindowStore("keyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("keyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getSessionStore("keyValueStore"));
        Assertions.assertNotNull((Object)this.testDriver.getKeyValueStore("keyValueTimestampStore"));
        Assertions.assertNotNull((Object)this.testDriver.getTimestampedKeyValueStore("keyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("keyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getWindowStore("keyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("keyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getSessionStore("keyValueTimestampStore"));
        if (persistent) {
            Assertions.assertNull((Object)this.testDriver.getKeyValueStore("keyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("keyValueVersionedStore"));
            Assertions.assertNotNull((Object)this.testDriver.getVersionedKeyValueStore("keyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getWindowStore("keyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("keyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getSessionStore("keyValueVersionedStore"));
        }
        Assertions.assertNull((Object)this.testDriver.getKeyValueStore("windowStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("windowStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("windowStore"));
        Assertions.assertNotNull((Object)this.testDriver.getWindowStore("windowStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("windowStore"));
        Assertions.assertNull((Object)this.testDriver.getSessionStore("windowStore"));
        Assertions.assertNull((Object)this.testDriver.getKeyValueStore("windowTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("windowTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("windowTimestampStore"));
        Assertions.assertNotNull((Object)this.testDriver.getWindowStore("windowTimestampStore"));
        Assertions.assertNotNull((Object)this.testDriver.getTimestampedWindowStore("windowTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getSessionStore("windowTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getKeyValueStore("sessionStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("sessionStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("sessionStore"));
        Assertions.assertNull((Object)this.testDriver.getWindowStore("sessionStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("sessionStore"));
        Assertions.assertNotNull((Object)this.testDriver.getSessionStore("sessionStore"));
        Assertions.assertNotNull((Object)this.testDriver.getKeyValueStore("globalKeyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("globalKeyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("globalKeyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getWindowStore("globalKeyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("globalKeyValueStore"));
        Assertions.assertNull((Object)this.testDriver.getSessionStore("globalKeyValueStore"));
        Assertions.assertNotNull((Object)this.testDriver.getKeyValueStore("globalKeyValueTimestampStore"));
        Assertions.assertNotNull((Object)this.testDriver.getTimestampedKeyValueStore("globalKeyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getVersionedKeyValueStore("globalKeyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getWindowStore("globalKeyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("globalKeyValueTimestampStore"));
        Assertions.assertNull((Object)this.testDriver.getSessionStore("globalKeyValueTimestampStore"));
        if (persistent) {
            Assertions.assertNull((Object)this.testDriver.getKeyValueStore("globalKeyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("globalKeyValueVersionedStore"));
            Assertions.assertNotNull((Object)this.testDriver.getVersionedKeyValueStore("globalKeyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getWindowStore("globalKeyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getTimestampedWindowStore("globalKeyValueVersionedStore"));
            Assertions.assertNull((Object)this.testDriver.getSessionStore("globalKeyValueVersionedStore"));
        }
    }

    @Test
    public void shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod() {
        this.shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(false);
    }

    @Test
    public void shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod() {
        this.shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(true);
    }

    private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(boolean persistent) {
        String keyValueStoreName = "keyValueStore";
        String timestampedKeyValueStoreName = "keyValueTimestampStore";
        String versionedKeyValueStoreName = "keyValueVersionedStore";
        String windowStoreName = "windowStore";
        String timestampedWindowStoreName = "windowTimestampStore";
        String sessionStoreName = "sessionStore";
        String globalKeyValueStoreName = "globalKeyValueStore";
        String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
        String globalVersionedKeyValueStoreName = "globalKeyValueVersionedStore";
        Topology topology = this.setupSingleProcessorTopology();
        this.addStoresToTopology(topology, persistent, "keyValueStore", "keyValueTimestampStore", "keyValueVersionedStore", "windowStore", "windowTimestampStore", "sessionStore", "globalKeyValueStore", "globalKeyValueTimestampStore", "globalKeyValueVersionedStore");
        this.testDriver = new TopologyTestDriver(topology, this.config);
        IllegalArgumentException e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("keyValueStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store keyValueStore is a key-value store and should be accessed via `getKeyValueStore()`"));
        e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("keyValueTimestampStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store keyValueTimestampStore is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
        if (persistent) {
            e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("keyValueVersionedStore"));
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store keyValueVersionedStore is a versioned key-value store and should be accessed via `getVersionedKeyValueStore()`"));
        }
        e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("windowStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store windowStore is a window store and should be accessed via `getWindowStore()`"));
        e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("windowTimestampStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store windowTimestampStore is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"));
        e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("sessionStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store sessionStore is a session store and should be accessed via `getSessionStore()`"));
        e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("globalKeyValueStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store globalKeyValueStore is a key-value store and should be accessed via `getKeyValueStore()`"));
        e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("globalKeyValueTimestampStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store globalKeyValueTimestampStore is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
        if (persistent) {
            e = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("globalKeyValueVersionedStore"));
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store globalKeyValueVersionedStore is a versioned key-value store and should be accessed via `getVersionedKeyValueStore()`"));
        }
    }

    private void addStoresToTopology(Topology topology, boolean persistent, String keyValueStoreName, String timestampedKeyValueStoreName, String versionedKeyValueStoreName, String windowStoreName, String timestampedWindowStoreName, String sessionStoreName, String globalKeyValueStoreName, String globalTimestampedKeyValueStoreName, String globalVersionedKeyValueStoreName) {
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(persistent ? Stores.persistentKeyValueStore((String)keyValueStoreName) : Stores.inMemoryKeyValueStore((String)keyValueStoreName)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)(persistent ? Stores.persistentTimestampedKeyValueStore((String)timestampedKeyValueStoreName) : Stores.inMemoryKeyValueStore((String)timestampedKeyValueStoreName)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        if (persistent) {
            topology.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)versionedKeyValueStoreName, (Duration)Duration.ofMillis(1000L)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        }
        topology.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)(persistent ? Stores.persistentWindowStore((String)windowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false) : Stores.inMemoryWindowStore((String)windowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)(persistent ? Stores.persistentTimestampedWindowStore((String)timestampedWindowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false) : Stores.inMemoryWindowStore((String)timestampedWindowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(persistent ? Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.persistentSessionStore((String)sessionStoreName, (Duration)Duration.ofMillis(1000L)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()) : Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.inMemorySessionStore((String)sessionStoreName, (Duration)Duration.ofMillis(1000L)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addGlobalStore(persistent ? Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)globalKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled() : Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)globalKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy1", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy1", "processorDummy1", this.voidProcessorSupplier);
        topology.addGlobalStore(persistent ? Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)globalTimestampedKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled() : Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)globalTimestampedKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy2", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy2", "processorDummy2", this.voidProcessorSupplier);
        if (persistent) {
            topology.addGlobalStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)globalVersionedKeyValueStoreName, (Duration)Duration.ofMillis(1000L)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy3", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy3", "processorDummy3", this.voidProcessorSupplier);
        }
    }

    @Test
    public void shouldReturnAllStoresNames() {
        Topology topology = this.setupSourceSinkTopology();
        topology.addStateStore((StoreBuilder)new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"store"), Serdes.ByteArray(), Serdes.ByteArray(), Time.SYSTEM), new String[0]);
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), Time.SYSTEM).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", this.voidProcessorSupplier);
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet<String> expectedStoreNames = new HashSet<String>();
        expectedStoreNames.add("store");
        expectedStoreNames.add("globalStore");
        MatcherAssert.assertThat(this.testDriver.getAllStateStores().keySet(), (Matcher)CoreMatchers.equalTo(expectedStoreNames));
    }

    private void setup() {
        this.setup(Stores.inMemoryKeyValueStore((String)"aggStore"));
    }

    private void setup(KeyValueBytesStoreSupplier storeSupplier) {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), (Serde)Serdes.Long()), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        this.config.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.config.setProperty("default.value.serde", Serdes.Long().getClass().getName());
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    private void pipeInput(String topic, String key, Long value, Long time) {
        this.testDriver.pipeRecord(topic, new TestRecord((Object)key, (Object)value, null, time), (Serializer)new StringSerializer(), (Serializer)new LongSerializer(), null);
    }

    private void compareKeyValue(TestRecord<String, Long> record, String key, Long value) {
        MatcherAssert.assertThat((Object)record.getKey(), (Matcher)CoreMatchers.equalTo((Object)key));
        MatcherAssert.assertThat((Object)record.getValue(), (Matcher)CoreMatchers.equalTo((Object)value));
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        this.setup();
        this.pipeInput("input-topic", "a", 1L, 9999L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        this.setup();
        this.pipeInput("input-topic", "a", 1L, 9999L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)21L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        this.setup();
        this.pipeInput("input-topic", "a", 42L, 9999L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)42L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 42L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        this.setup();
        this.pipeInput("input-topic", "b", 21L, 9999L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"b"), (Matcher)CoreMatchers.equalTo((Object)21L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "b", 21L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        this.setup();
        this.pipeInput("input-topic", "a", 1L, 9999L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        this.pipeInput("input-topic", "a", 1L, 9999L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
        this.pipeInput("input-topic", "a", 1L, 10000L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        this.setup();
        this.testDriver.advanceWallClockTime(Duration.ofMillis(60000L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assertions.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"aggStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()).withCachingEnabled(), new String[]{"aggregator"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    @Test
    public void shouldCleanUpPersistentStateStoresOnClose() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("storeProcessor", (ProcessorSupplier)new ProcessorSupplier<String, Long, Void, Void>(){

            public Processor<String, Long, Void, Void> get() {
                return new Processor<String, Long, Void, Void>(){
                    private KeyValueStore<String, Long> store;

                    public void init(ProcessorContext<Void, Void> context) {
                        this.store = (KeyValueStore)context.getStateStore("storeProcessorStore");
                    }

                    public void process(Record<String, Long> record) {
                        this.store.put(record.key(), record.value());
                    }
                };
            }
        }, new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"storeProcessorStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()), new String[]{"storeProcessor"});
        Properties config = new Properties();
        config.put("application.id", "test-TopologyTestDriver-cleanup");
        config.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        config.put("default.key.serde", Serdes.String().getClass().getName());
        config.put("default.value.serde", Serdes.Long().getClass().getName());
        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);){
            Assertions.assertNull((Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
            testDriver.pipeRecord("input-topic", new TestRecord((Object)"a", (Object)1L), (Serializer)new StringSerializer(), (Serializer)new LongSerializer(), Instant.now());
            Assertions.assertEquals((Object)1L, (Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
        }
        testDriver = new TopologyTestDriver(topology, config);
        var4_4 = null;
        try {
            Assertions.assertNull((Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"), (String)"Closing the prior test driver should have cleaned up this store and value.");
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (testDriver != null) {
                if (var4_4 != null) {
                    try {
                        testDriver.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    testDriver.close();
                }
            }
        }
    }

    @Test
    public void shouldFeedStoreFromGlobalKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()), Materialized.as((String)"globalStore"));
        try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), this.config);){
            KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore");
            Assertions.assertNotNull((Object)globalStore);
            Assertions.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
            testDriver.pipeRecord("topic", new TestRecord((Object)"k1", (Object)"value1"), (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.now());
            Assertions.assertEquals((Object)"value1", (Object)globalStore.get((Object)"k1"));
        }
    }

    private Topology setupMultipleSourcesPatternTopology(Pattern ... sourceTopicPatternNames) {
        Topology topology = new Topology();
        String[] processorNames = new String[sourceTopicPatternNames.length];
        int i = 0;
        for (Pattern sourceTopicPatternName : sourceTopicPatternNames) {
            String sourceName = sourceTopicPatternName + "-source";
            String processorName = sourceTopicPatternName + "-processor";
            topology.addSource(sourceName, sourceTopicPatternName);
            processorNames[i++] = processorName;
            topology.addProcessor(processorName, (ProcessorSupplier)new MockProcessorSupplier(), new String[]{sourceName});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
        return topology;
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
        Pattern pattern2Source2 = Pattern.compile("source-topic-[A-Z]");
        String consumerTopic2 = "source-topic-Z";
        TestRecord consumerRecord2 = new TestRecord((Object)this.key2, (Object)this.value2, null, Long.valueOf(43L));
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        Assertions.assertEquals((int)1, (int)processedRecords1.size());
        Assertions.assertEquals((int)0, (int)processedRecords2.size());
        TTDTestRecord record1 = (TTDTestRecord)processedRecords1.get(0);
        TTDTestRecord expectedResult1 = new TTDTestRecord(SOURCE_TOPIC_1, this.testRecord1, 0L);
        MatcherAssert.assertThat((Object)record1, (Matcher)CoreMatchers.equalTo((Object)expectedResult1));
        this.pipeRecord("source-topic-Z", (TestRecord<byte[], byte[]>)consumerRecord2);
        Assertions.assertEquals((int)1, (int)processedRecords1.size());
        Assertions.assertEquals((int)1, (int)processedRecords2.size());
        TTDTestRecord record2 = (TTDTestRecord)processedRecords2.get(0);
        TTDTestRecord expectedResult2 = new TTDTestRecord("source-topic-Z", (TestRecord<byte[], byte[]>)consumerRecord2, 0L);
        MatcherAssert.assertThat((Object)record2, (Matcher)CoreMatchers.equalTo((Object)expectedResult2));
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        String sourceName = "source";
        Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
        Topology topology = new Topology();
        topology.addSource("source", pattern2Source1);
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord outputRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assertions.assertArrayEquals((byte[])this.key1, (byte[])((byte[])outputRecord.key()));
        Assertions.assertArrayEquals((byte[])this.value1, (byte[])((byte[])outputRecord.value()));
        Assertions.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
    }

    @Test
    public void shouldThrowPatternNotValidForTopicNameException() {
        String sourceName = "source";
        String pattern2Source1 = "source-topic-\\d";
        Topology topology = new Topology();
        topology.addSource("source", new String[]{"source-topic-\\d"});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        try {
            this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        }
        catch (TopologyException exception) {
            String str = String.format("Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for input record topic: %s and hence cannot process the message.", "source-topic-\\d", SOURCE_TOPIC_1);
            Assertions.assertEquals((Object)str, (Object)exception.getMessage());
        }
    }

    @Test
    public void shouldNotCreateStateDirectoryForStatelessTopology() {
        this.setup();
        String stateDir = this.config.getProperty("state.dir");
        File appDir = new File(stateDir, this.config.getProperty("application.id"));
        Assertions.assertFalse((boolean)appDir.exists());
    }

    @Test
    public void shouldCreateStateDirectoryForStatefulTopology() {
        this.setup(Stores.persistentKeyValueStore((String)"aggStore"));
        String stateDir = this.config.getProperty("state.dir");
        File appDir = new File(stateDir, this.config.getProperty("application.id"));
        Assertions.assertTrue((boolean)appDir.exists());
        Assertions.assertTrue((boolean)appDir.isDirectory());
        TaskId taskId = new TaskId(0, 0);
        Assertions.assertTrue((boolean)new File(appDir, taskId.toString()).exists());
    }

    @Test
    public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
        Topology topology = new Topology();
        topology.addSource("source", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input"});
        topology.addProcessor("recursiveProcessor", () -> new Processor<String, String, String, String>(){
            private ProcessorContext context;

            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            public void process(Record<String, String> record) {
                String value = (String)record.value();
                if (!value.startsWith("recurse-")) {
                    this.context.forward(record.withValue((Object)("recurse-" + value)), "recursiveSink");
                }
                this.context.forward(record, "sink");
            }
        }, new String[]{"source"});
        topology.addSink("recursiveSink", "input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("sink", "output", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology);){
            TestInputTopic in = topologyTestDriver.createInputTopic("input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic out = topologyTestDriver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            in.pipeInput((Object)"B", (Object)"beta");
            List events = out.readKeyValuesToList();
            MatcherAssert.assertThat((Object)events, (Matcher)CoreMatchers.is(Arrays.asList(new KeyValue((Object)"B", (Object)"beta"), new KeyValue((Object)"B", (Object)"recurse-beta"))));
        }
    }

    @Test
    public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
        Topology topology = new Topology();
        topology.addSource("source", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"global-store"), (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled(), "globalSource", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), "global-topic", "globalProcessor", () -> new Processor<String, String, Void, Void>(){
            private KeyValueStore stateStore;

            public void init(ProcessorContext<Void, Void> context) {
                this.stateStore = (KeyValueStore)context.getStateStore("global-store");
            }

            public void process(Record<String, String> record) {
                this.stateStore.put(record.key(), record.value());
            }
        });
        topology.addProcessor("recursiveProcessor", () -> new Processor<String, String, String, String>(){
            private ProcessorContext context;

            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            public void process(Record<String, String> record) {
                String value = (String)record.value();
                if (!value.startsWith("recurse-")) {
                    this.context.forward(record.withValue((Object)("recurse-" + value)), "recursiveSink");
                }
                this.context.forward(record, "sink");
                this.context.forward(record, "globalSink");
            }
        }, new String[]{"source"});
        topology.addSink("recursiveSink", "input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("sink", "output", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("globalSink", "global-topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology);){
            TestInputTopic in = topologyTestDriver.createInputTopic("input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic globalTopic = topologyTestDriver.createOutputTopic("global-topic", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            in.pipeInput((Object)"A", (Object)"alpha");
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("global-store");
            MatcherAssert.assertThat((Object)keyValueStore, (Matcher)CoreMatchers.notNullValue());
            MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)"recurse-alpha"));
            List events = globalTopic.readKeyValuesToList();
            MatcherAssert.assertThat((Object)events, (Matcher)CoreMatchers.is(Arrays.asList(new KeyValue((Object)"A", (Object)"alpha"), new KeyValue((Object)"A", (Object)"recurse-alpha"))));
        }
    }

    @Test
    public void shouldRespectTaskIdling() {
        Properties properties = new Properties();
        properties.setProperty("max.task.idle.ms", "1000");
        Topology topology = new Topology();
        topology.addSource("source1", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input1"});
        topology.addSource("source2", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input2"});
        topology.addSink("sink", "output", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"source1", "source2"});
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);){
            TestInputTopic in1 = topologyTestDriver.createInputTopic("input1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic in2 = topologyTestDriver.createInputTopic("input2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic out = topologyTestDriver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            in1.pipeInput((Object)"A", (Object)"alpha");
            topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1L));
            MatcherAssert.assertThat((Object)out.readKeyValuesToList(), (Matcher)CoreMatchers.is(Collections.emptyList()));
            in2.pipeInput((Object)"B", (Object)"beta");
            MatcherAssert.assertThat((Object)out.readKeyValuesToList(), (Matcher)CoreMatchers.is(Collections.singletonList(new KeyValue((Object)"A", (Object)"alpha"))));
            topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(1L));
            MatcherAssert.assertThat((Object)out.readKeyValuesToList(), (Matcher)CoreMatchers.is(Collections.singletonList(new KeyValue((Object)"B", (Object)"beta"))));
        }
    }

    private static class CustomMaxAggregator
    implements Processor<String, Long, String, Long> {
        ProcessorContext<String, Long> context;
        private KeyValueStore<String, Long> store;

        private CustomMaxAggregator() {
        }

        public void init(ProcessorContext<String, Long> context) {
            this.context = context;
            context.schedule(Duration.ofMinutes(1L), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
            context.schedule(Duration.ofSeconds(10L), PunctuationType.STREAM_TIME, this::flushStore);
            this.store = (KeyValueStore)context.getStateStore("aggStore");
        }

        public void process(Record<String, Long> record) {
            Long oldValue = (Long)this.store.get(record.key());
            if (oldValue == null || (Long)record.value() > oldValue) {
                this.store.put(record.key(), record.value());
            }
        }

        private void flushStore(long timestamp) {
            try (KeyValueIterator it = this.store.all();){
                while (it.hasNext()) {
                    KeyValue next = (KeyValue)it.next();
                    this.context.forward(new Record(next.key, next.value, timestamp));
                }
            }
        }
    }

    private static class CustomMaxAggregatorSupplier
    implements ProcessorSupplier<String, Long, String, Long> {
        private CustomMaxAggregatorSupplier() {
        }

        public Processor<String, Long, String, Long> get() {
            return new CustomMaxAggregator();
        }
    }

    private final class MockProcessorSupplier
    implements ProcessorSupplier<Object, Object, Object, Object> {
        private final Collection<Punctuation> punctuations;

        private MockProcessorSupplier() {
            this(Collections.emptySet());
        }

        private MockProcessorSupplier(Collection<Punctuation> punctuations) {
            this.punctuations = punctuations;
        }

        public Processor<Object, Object, Object, Object> get() {
            MockProcessor mockProcessor = new MockProcessor(this.punctuations);
            if (!this.isCheckSupplierCall()) {
                TopologyTestDriverTest.this.mockProcessors.add(mockProcessor);
            }
            return mockProcessor;
        }

        public boolean isCheckSupplierCall() {
            return Arrays.stream(Thread.currentThread().getStackTrace()).anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName()));
        }
    }

    private static final class MockProcessor
    implements Processor<Object, Object, Object, Object> {
        private final Collection<Punctuation> punctuations;
        private ProcessorContext<Object, Object> context;
        private boolean initialized = false;
        private boolean closed = false;
        private final List<TTDTestRecord> processedRecords = new ArrayList<TTDTestRecord>();

        MockProcessor(Collection<Punctuation> punctuations) {
            this.punctuations = punctuations;
        }

        public void init(ProcessorContext<Object, Object> context) {
            this.initialized = true;
            this.context = context;
            for (Punctuation punctuation : this.punctuations) {
                this.context.schedule(Duration.ofMillis(punctuation.intervalMs), punctuation.punctuationType, punctuation.callback);
            }
        }

        public void process(Record<Object, Object> record) {
            this.processedRecords.add(new TTDTestRecord(record.key(), record.value(), record.headers(), record.timestamp(), this.context.recordMetadata().map(RecordMetadata::offset).orElse(-1L), this.context.recordMetadata().map(RecordMetadata::topic).orElse(null)));
            this.context.forward(record);
        }

        public void close() {
            this.closed = true;
        }
    }

    private static final class MockPunctuator
    implements Punctuator {
        private final List<Long> punctuatedAt = new LinkedList<Long>();

        private MockPunctuator() {
        }

        public void punctuate(long timestamp) {
            this.punctuatedAt.add(timestamp);
        }
    }

    private static final class Punctuation {
        private final long intervalMs;
        private final PunctuationType punctuationType;
        private final Punctuator callback;

        Punctuation(long intervalMs, PunctuationType punctuationType, Punctuator callback) {
            this.intervalMs = intervalMs;
            this.punctuationType = punctuationType;
            this.callback = callback;
        }
    }

    private static final class TTDTestRecord {
        private final Object key;
        private final Object value;
        private final long timestamp;
        private final long offset;
        private final String topic;
        private final Headers headers;

        TTDTestRecord(String newTopic, TestRecord<byte[], byte[]> consumerRecord, long newOffset) {
            this.key = consumerRecord.key();
            this.value = consumerRecord.value();
            this.timestamp = consumerRecord.timestamp();
            this.offset = newOffset;
            this.topic = newTopic;
            this.headers = consumerRecord.headers();
        }

        TTDTestRecord(Object key, Object value, Headers headers, long timestamp, long offset, String topic) {
            this.key = key;
            this.value = value;
            this.headers = headers;
            this.timestamp = timestamp;
            this.offset = offset;
            this.topic = topic;
        }

        public String toString() {
            return "key: " + this.key + ", value: " + this.value + ", timestamp: " + this.timestamp + ", offset: " + this.offset + ", topic: " + this.topic + ", num.headers: " + (this.headers == null ? "null" : Integer.valueOf(this.headers.toArray().length));
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TTDTestRecord record = (TTDTestRecord)o;
            return this.timestamp == record.timestamp && this.offset == record.offset && Objects.equals(this.key, record.key) && Objects.equals(this.value, record.value) && Objects.equals(this.topic, record.topic) && Objects.equals(this.headers, record.headers);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.value, this.headers, this.timestamp, this.offset, this.topic);
        }
    }
}

