/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TopologyTestDriverTest {
    private static final String SOURCE_TOPIC_1 = "source-topic-1";
    private static final String SOURCE_TOPIC_2 = "source-topic-2";
    private static final String SINK_TOPIC_1 = "sink-topic-1";
    private static final String SINK_TOPIC_2 = "sink-topic-2";
    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
    private final byte[] key1 = new byte[0];
    private final byte[] value1 = new byte[0];
    private final long timestamp1 = 42L;
    private final TestRecord<byte[], byte[]> testRecord1 = new TestRecord((Object)this.key1, (Object)this.value1, this.headers, Long.valueOf(42L));
    private final byte[] key2 = new byte[0];
    private final byte[] value2 = new byte[0];
    private final long timestamp2 = 43L;
    private final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = this.consumerRecordFactory.create("source-topic-1", (Object)this.key1, (Object)this.value1, this.headers, 42L);
    private final ConsumerRecord<byte[], byte[]> consumerRecord2 = this.consumerRecordFactory.create("source-topic-2", (Object)this.key2, (Object)this.value2, 43L);
    private TopologyTestDriver testDriver;
    private final Properties config = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-TopologyTestDriver"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getAbsolutePath())}));
    private KeyValueStore<String, Long> store;
    private final StringDeserializer stringDeserializer = new StringDeserializer();
    private final LongDeserializer longDeserializer = new LongDeserializer();
    private final List<MockProcessor> mockProcessors = new ArrayList<MockProcessor>();

    @Parameterized.Parameters(name="Eos enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (boolean eosEnabled : Arrays.asList(true, false)) {
            values.add(new Object[]{eosEnabled});
        }
        return values;
    }

    public TopologyTestDriverTest(boolean eosEnabled) {
        if (eosEnabled) {
            this.config.put("processing.guarantee", "exactly_once");
        }
    }

    @After
    public void tearDown() {
        if (this.testDriver != null) {
            this.testDriver.close();
        }
    }

    private Topology setupSourceSinkTopology() {
        Topology topology = new Topology();
        String sourceName = "source";
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        return topology;
    }

    private Topology setupTopologyWithTwoSubtopologies() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        topology.addSource("source-1", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink-1", SINK_TOPIC_1, new String[]{"source-1"});
        topology.addSource("source-2", new String[]{SINK_TOPIC_1});
        topology.addSink("sink-2", SINK_TOPIC_2, new String[]{"source-2"});
        return topology;
    }

    private Topology setupSingleProcessorTopology() {
        return this.setupSingleProcessorTopology(-1L, null, null);
    }

    private Topology setupSingleProcessorTopology(long punctuationIntervalMs, PunctuationType punctuationType, Punctuator callback) {
        Set<Object> punctuations = punctuationIntervalMs > 0L && punctuationType != null && callback != null ? Collections.singleton(new Punctuation(punctuationIntervalMs, punctuationType, callback)) : Collections.emptySet();
        Topology topology = new Topology();
        String sourceName = "source";
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(punctuations), new String[]{"source"});
        return topology;
    }

    private Topology setupMultipleSourceTopology(String ... sourceTopicNames) {
        Topology topology = new Topology();
        String[] processorNames = new String[sourceTopicNames.length];
        int i = 0;
        for (String sourceTopicName : sourceTopicNames) {
            String sourceName = sourceTopicName + "-source";
            String processorName = sourceTopicName + "-processor";
            topology.addSource(sourceName, new String[]{sourceTopicName});
            processorNames[i++] = processorName;
            topology.addProcessor(processorName, (ProcessorSupplier)new MockProcessorSupplier(), new String[]{sourceName});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
        return topology;
    }

    private Topology setupGlobalStoreTopology(String ... sourceTopicNames) {
        if (sourceTopicNames.length == 0) {
            throw new IllegalArgumentException("sourceTopicNames cannot be empty");
        }
        Topology topology = new Topology();
        for (String sourceTopicName : sourceTopicNames) {
            topology.addGlobalStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)(sourceTopicName + "-globalStore")), null, null).withLoggingDisabled(), sourceTopicName, null, null, sourceTopicName, sourceTopicName + "-processor", (ProcessorSupplier)new MockProcessorSupplier());
        }
        return topology;
    }

    @Test
    public void shouldInitProcessor() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        Assert.assertTrue((boolean)this.mockProcessors.get(0).initialized);
    }

    @Test
    public void shouldCloseProcessor() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        this.testDriver.close();
        Assert.assertTrue((boolean)this.mockProcessors.get(0).closed);
        this.testDriver = null;
    }

    @Test
    public void shouldThrowForUnknownTopic() {
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.pipeRecord("unknownTopic", new TestRecord((Object)null), (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), Instant.now()));
    }

    @Test
    public void shouldThrowForMissingTime() {
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        Assert.assertThrows(IllegalStateException.class, () -> this.testDriver.pipeRecord(SINK_TOPIC_1, new TestRecord((Object)"value"), (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), null));
    }

    @Deprecated
    @Test
    public void shouldThrowForUnknownTopicDeprecated() {
        String unknownTopic = "unknownTopic";
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory("unknownTopic", (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        try {
            this.testDriver.pipeInput(consumerRecordFactory.create((Object)null));
            Assert.fail((String)"Should have throw IllegalArgumentException");
        }
        catch (IllegalArgumentException exception) {
            Assert.assertEquals((Object)"Unknown topic: unknownTopic", (Object)exception.getMessage());
        }
    }

    @Test
    public void shouldProcessRecordForTopic() {
        this.testDriver = new TopologyTestDriver(this.setupSourceSinkTopology(), this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord outputRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
    }

    @Test
    public void shouldSetRecordMetadata() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        List processedRecords = this.mockProcessors.get(0).processedRecords;
        Assert.assertEquals((long)1L, (long)processedRecords.size());
        Record record = (Record)processedRecords.get(0);
        Record expectedResult = new Record(SOURCE_TOPIC_1, this.testRecord1, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    private void pipeRecord(String topic, TestRecord<byte[], byte[]> record) {
        this.testDriver.pipeRecord(topic, record, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), null);
    }

    @Deprecated
    @Test
    public void shouldSendRecordViaCorrectSourceTopicDeprecated() {
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        this.testDriver.pipeInput(this.consumerRecord1);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)0L, (long)processedRecords2.size());
        Record record = (Record)processedRecords1.get(0);
        Record expectedResult = new Record(this.consumerRecord1, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
        this.testDriver.pipeInput(this.consumerRecord2);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)1L, (long)processedRecords2.size());
        record = (Record)processedRecords2.get(0);
        expectedResult = new Record(this.consumerRecord2, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Deprecated
    @Test
    public void shouldUseSourceSpecificDeserializersDeprecated() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        String processor = "processor";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topology.addSink("sink", SINK_TOPIC_1, (topic, data) -> {
            if (data instanceof Long) {
                return Serdes.Long().serializer().serialize(topic, (Object)((Long)data));
            }
            return Serdes.Integer().serializer().serialize(topic, (Object)((Integer)data));
        }, (topic, data) -> {
            if (data instanceof String) {
                return Serdes.String().serializer().serialize(topic, (Object)((String)data));
            }
            return Serdes.Double().serializer().serialize(topic, (Object)((Double)data));
        }, new String[]{"processor"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        ConsumerRecordFactory source1Factory = new ConsumerRecordFactory(SOURCE_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer());
        ConsumerRecordFactory source2Factory = new ConsumerRecordFactory(SOURCE_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer());
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        ConsumerRecord consumerRecord1 = source1Factory.create((Object)source1Key, (Object)"anyString");
        ConsumerRecord consumerRecord2 = source2Factory.create((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeInput(consumerRecord1);
        ProducerRecord record1 = this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat((Object)record1.key(), (Matcher)CoreMatchers.equalTo((Object)source1Key));
        MatcherAssert.assertThat((Object)record1.value(), (Matcher)CoreMatchers.equalTo((Object)"anyString"));
        this.testDriver.pipeInput(consumerRecord2);
        ProducerRecord record2 = this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat((Object)record2.key(), (Matcher)CoreMatchers.equalTo((Object)source2Key));
        MatcherAssert.assertThat((Object)record2.value(), (Matcher)CoreMatchers.equalTo((Object)source2Value));
    }

    @Test
    public void shouldUseSourceSpecificDeserializers() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        String processor = "processor";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topology.addSink("sink", SINK_TOPIC_1, (topic, data) -> {
            if (data instanceof Long) {
                return Serdes.Long().serializer().serialize(topic, (Object)((Long)data));
            }
            return Serdes.Integer().serializer().serialize(topic, (Object)((Integer)data));
        }, (topic, data) -> {
            if (data instanceof String) {
                return Serdes.String().serializer().serialize(topic, (Object)((String)data));
            }
            return Serdes.Double().serializer().serialize(topic, (Object)((Double)data));
        }, new String[]{"processor"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        TestRecord consumerRecord1 = new TestRecord((Object)source1Key, (Object)"anyString");
        TestRecord consumerRecord2 = new TestRecord((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeRecord(SOURCE_TOPIC_1, consumerRecord1, Serdes.Long().serializer(), Serdes.String().serializer(), Instant.now());
        TestRecord result1 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat((Object)result1.getKey(), (Matcher)CoreMatchers.equalTo((Object)source1Key));
        MatcherAssert.assertThat((Object)result1.getValue(), (Matcher)CoreMatchers.equalTo((Object)"anyString"));
        this.testDriver.pipeRecord(SOURCE_TOPIC_2, consumerRecord2, Serdes.Integer().serializer(), Serdes.Double().serializer(), Instant.now());
        TestRecord result2 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat((Object)result2.getKey(), (Matcher)CoreMatchers.equalTo((Object)source2Key));
        MatcherAssert.assertThat((Object)result2.getValue(), (Matcher)CoreMatchers.equalTo((Object)source2Value));
    }

    @Test
    public void shouldUseSinkSpecificSerializers() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), new String[]{"source-1"});
        topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), new String[]{"source-2"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        TestRecord consumerRecord1 = new TestRecord((Object)source1Key, (Object)"anyString");
        TestRecord consumerRecord2 = new TestRecord((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeRecord(SOURCE_TOPIC_1, consumerRecord1, Serdes.Long().serializer(), Serdes.String().serializer(), Instant.now());
        TestRecord result1 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat((Object)result1.getKey(), (Matcher)CoreMatchers.equalTo((Object)source1Key));
        MatcherAssert.assertThat((Object)result1.getValue(), (Matcher)CoreMatchers.equalTo((Object)"anyString"));
        this.testDriver.pipeRecord(SOURCE_TOPIC_2, consumerRecord2, Serdes.Integer().serializer(), Serdes.Double().serializer(), Instant.now());
        TestRecord result2 = this.testDriver.readRecord(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat((Object)result2.getKey(), (Matcher)CoreMatchers.equalTo((Object)source2Key));
        MatcherAssert.assertThat((Object)result2.getValue(), (Matcher)CoreMatchers.equalTo((Object)source2Value));
    }

    @Deprecated
    @Test
    public void shouldProcessConsumerRecordList() {
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        ArrayList<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(2);
        testRecords.add(this.consumerRecord1);
        testRecords.add(this.consumerRecord2);
        this.testDriver.pipeInput(testRecords);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)1L, (long)processedRecords2.size());
        Record record = (Record)processedRecords1.get(0);
        Record expectedResult = new Record(this.consumerRecord1, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
        record = (Record)processedRecords2.get(0);
        expectedResult = new Record(this.consumerRecord2, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldForwardRecordsFromSubtopologyToSubtopology() {
        this.testDriver = new TopologyTestDriver(this.setupTopologyWithTwoSubtopologies(), this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord outputRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
        outputRecord = this.testDriver.readRecord(SINK_TOPIC_2);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_2, (Object)outputRecord.topic());
    }

    @Test
    public void shouldPopulateGlobalStore() {
        this.testDriver = new TopologyTestDriver(this.setupGlobalStoreTopology(SOURCE_TOPIC_1), this.config);
        KeyValueStore globalStore = this.testDriver.getKeyValueStore("source-topic-1-globalStore");
        Assert.assertNotNull((Object)globalStore);
        Assert.assertNotNull(this.testDriver.getAllStateStores().get("source-topic-1-globalStore"));
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        List processedRecords = this.mockProcessors.get(0).processedRecords;
        Assert.assertEquals((long)1L, (long)processedRecords.size());
        Record record = (Record)processedRecords.get(0);
        Record expectedResult = new Record(SOURCE_TOPIC_1, this.testRecord1, 0L);
        MatcherAssert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldPunctuateOnStreamsTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator), this.config);
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        expectedPunctuations.add(42L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(42L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(42L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(51L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(51L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(52L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(61L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(61L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(65L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(71L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(71L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(72L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(95L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(95L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(101L);
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(101L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.pipeRecord(SOURCE_TOPIC_1, (TestRecord<byte[], byte[]>)new TestRecord((Object)this.key1, (Object)this.value1, null, Long.valueOf(102L)));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldPunctuateOnWallClockTimeDeprecated() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator), this.config, 0L);
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        this.testDriver.advanceWallClockTime(5L);
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(14L);
        this.testDriver.advanceWallClockTime(9L);
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.advanceWallClockTime(1L);
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(35L);
        this.testDriver.advanceWallClockTime(20L);
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(40L);
        this.testDriver.advanceWallClockTime(5L);
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldPunctuateOnWallClockTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator), this.config, Instant.ofEpochMilli(0L));
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        this.testDriver.advanceWallClockTime(Duration.ofMillis(5L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(14L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(9L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.advanceWallClockTime(Duration.ofMillis(1L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(35L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(20L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(40L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(5L));
        MatcherAssert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldReturnAllStores() {
        Topology topology = this.setupSourceSinkTopology();
        topology.addProcessor("processor", () -> null, new String[]{"source"});
        topology.addStateStore((StoreBuilder)new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"store"), Serdes.ByteArray(), Serdes.ByteArray(), (Time)new SystemTime()), new String[]{"processor"});
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), (Time)new SystemTime()).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", () -> null);
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet<String> expectedStoreNames = new HashSet<String>();
        expectedStoreNames.add("store");
        expectedStoreNames.add("globalStore");
        Map allStores = this.testDriver.getAllStateStores();
        MatcherAssert.assertThat(allStores.keySet(), (Matcher)CoreMatchers.equalTo(expectedStoreNames));
        for (StateStore store : allStores.values()) {
            Assert.assertNotNull((Object)store);
        }
    }

    @Test
    public void shouldReturnCorrectPersistentStoreTypeOnly() {
        this.shouldReturnCorrectStoreTypeOnly(true);
    }

    @Test
    public void shouldReturnCorrectInMemoryStoreTypeOnly() {
        this.shouldReturnCorrectStoreTypeOnly(false);
    }

    private void shouldReturnCorrectStoreTypeOnly(boolean persistent) {
        String keyValueStoreName = "keyValueStore";
        String timestampedKeyValueStoreName = "keyValueTimestampStore";
        String windowStoreName = "windowStore";
        String timestampedWindowStoreName = "windowTimestampStore";
        String sessionStoreName = "sessionStore";
        String globalKeyValueStoreName = "globalKeyValueStore";
        String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
        Topology topology = this.setupSingleProcessorTopology();
        this.addStoresToTopology(topology, persistent, "keyValueStore", "keyValueTimestampStore", "windowStore", "windowTimestampStore", "sessionStore", "globalKeyValueStore", "globalKeyValueTimestampStore");
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Assert.assertNotNull((Object)this.testDriver.getKeyValueStore("keyValueStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("keyValueStore"));
        Assert.assertNull((Object)this.testDriver.getWindowStore("keyValueStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedWindowStore("keyValueStore"));
        Assert.assertNull((Object)this.testDriver.getSessionStore("keyValueStore"));
        Assert.assertNotNull((Object)this.testDriver.getKeyValueStore("keyValueTimestampStore"));
        Assert.assertNotNull((Object)this.testDriver.getTimestampedKeyValueStore("keyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getWindowStore("keyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedWindowStore("keyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getSessionStore("keyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getKeyValueStore("windowStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("windowStore"));
        Assert.assertNotNull((Object)this.testDriver.getWindowStore("windowStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedWindowStore("windowStore"));
        Assert.assertNull((Object)this.testDriver.getSessionStore("windowStore"));
        Assert.assertNull((Object)this.testDriver.getKeyValueStore("windowTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("windowTimestampStore"));
        Assert.assertNotNull((Object)this.testDriver.getWindowStore("windowTimestampStore"));
        Assert.assertNotNull((Object)this.testDriver.getTimestampedWindowStore("windowTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getSessionStore("windowTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getKeyValueStore("sessionStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("sessionStore"));
        Assert.assertNull((Object)this.testDriver.getWindowStore("sessionStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedWindowStore("sessionStore"));
        Assert.assertNotNull((Object)this.testDriver.getSessionStore("sessionStore"));
        Assert.assertNotNull((Object)this.testDriver.getKeyValueStore("globalKeyValueStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedKeyValueStore("globalKeyValueStore"));
        Assert.assertNull((Object)this.testDriver.getWindowStore("globalKeyValueStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedWindowStore("globalKeyValueStore"));
        Assert.assertNull((Object)this.testDriver.getSessionStore("globalKeyValueStore"));
        Assert.assertNotNull((Object)this.testDriver.getKeyValueStore("globalKeyValueTimestampStore"));
        Assert.assertNotNull((Object)this.testDriver.getTimestampedKeyValueStore("globalKeyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getWindowStore("globalKeyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getTimestampedWindowStore("globalKeyValueTimestampStore"));
        Assert.assertNull((Object)this.testDriver.getSessionStore("globalKeyValueTimestampStore"));
    }

    @Test
    public void shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod() {
        this.shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(false);
    }

    @Test
    public void shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod() {
        this.shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(true);
    }

    private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(boolean persistent) {
        String keyValueStoreName = "keyValueStore";
        String timestampedKeyValueStoreName = "keyValueTimestampStore";
        String windowStoreName = "windowStore";
        String timestampedWindowStoreName = "windowTimestampStore";
        String sessionStoreName = "sessionStore";
        String globalKeyValueStoreName = "globalKeyValueStore";
        String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
        Topology topology = this.setupSingleProcessorTopology();
        this.addStoresToTopology(topology, persistent, "keyValueStore", "keyValueTimestampStore", "windowStore", "windowTimestampStore", "sessionStore", "globalKeyValueStore", "globalKeyValueTimestampStore");
        this.testDriver = new TopologyTestDriver(topology, this.config);
        IllegalArgumentException e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("keyValueStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store keyValueStore is a key-value store and should be accessed via `getKeyValueStore()`"));
        e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("keyValueTimestampStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store keyValueTimestampStore is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
        e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("windowStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store windowStore is a window store and should be accessed via `getWindowStore()`"));
        e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("windowTimestampStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store windowTimestampStore is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"));
        e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("sessionStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store sessionStore is a session store and should be accessed via `getSessionStore()`"));
        e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("globalKeyValueStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store globalKeyValueStore is a key-value store and should be accessed via `getKeyValueStore()`"));
        e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> this.testDriver.getStateStore("globalKeyValueTimestampStore"));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Store globalKeyValueTimestampStore is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
    }

    private void addStoresToTopology(Topology topology, boolean persistent, String keyValueStoreName, String timestampedKeyValueStoreName, String windowStoreName, String timestampedWindowStoreName, String sessionStoreName, String globalKeyValueStoreName, String globalTimestampedKeyValueStoreName) {
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(persistent ? Stores.persistentKeyValueStore((String)keyValueStoreName) : Stores.inMemoryKeyValueStore((String)keyValueStoreName)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)(persistent ? Stores.persistentTimestampedKeyValueStore((String)timestampedKeyValueStoreName) : Stores.inMemoryKeyValueStore((String)timestampedKeyValueStoreName)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)(persistent ? Stores.persistentWindowStore((String)windowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false) : Stores.inMemoryWindowStore((String)windowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)(persistent ? Stores.persistentTimestampedWindowStore((String)timestampedWindowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false) : Stores.inMemoryWindowStore((String)timestampedWindowStoreName, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(100L), (boolean)false)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(persistent ? Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.persistentSessionStore((String)sessionStoreName, (Duration)Duration.ofMillis(1000L)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()) : Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.inMemorySessionStore((String)sessionStoreName, (Duration)Duration.ofMillis(1000L)), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()), new String[]{"processor"});
        topology.addGlobalStore(persistent ? Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)globalKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled() : Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)globalKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy1", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy1", "processorDummy1", () -> null);
        topology.addGlobalStore(persistent ? Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)globalTimestampedKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled() : Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)globalTimestampedKeyValueStoreName), (Serde)Serdes.ByteArray(), (Serde)Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy2", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy2", "processorDummy2", () -> null);
    }

    @Test
    public void shouldReturnAllStoresNames() {
        Topology topology = this.setupSourceSinkTopology();
        topology.addStateStore((StoreBuilder)new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"store"), Serdes.ByteArray(), Serdes.ByteArray(), (Time)new SystemTime()), new String[0]);
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), (Time)new SystemTime()).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", () -> null);
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet<String> expectedStoreNames = new HashSet<String>();
        expectedStoreNames.add("store");
        expectedStoreNames.add("globalStore");
        MatcherAssert.assertThat(this.testDriver.getAllStateStores().keySet(), (Matcher)CoreMatchers.equalTo(expectedStoreNames));
    }

    private void setup() {
        this.setup(Stores.inMemoryKeyValueStore((String)"aggStore"));
    }

    private void setup(KeyValueBytesStoreSupplier storeSupplier) {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), (Serde)Serdes.Long()), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        this.config.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.config.setProperty("default.value.serde", Serdes.Long().getClass().getName());
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    private void pipeInput(String topic, String key, Long value, Long time) {
        this.testDriver.pipeRecord(topic, new TestRecord((Object)key, (Object)value, null, time), (Serializer)new StringSerializer(), (Serializer)new LongSerializer(), null);
    }

    private void compareKeyValue(TestRecord<String, Long> record, String key, Long value) {
        MatcherAssert.assertThat((Object)record.getKey(), (Matcher)CoreMatchers.equalTo((Object)key));
        MatcherAssert.assertThat((Object)record.getValue(), (Matcher)CoreMatchers.equalTo((Object)value));
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        this.setup();
        this.pipeInput("input-topic", "a", 1L, 9999L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        this.setup();
        this.pipeInput("input-topic", "a", 1L, 9999L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)21L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        this.setup();
        this.pipeInput("input-topic", "a", 42L, 9999L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)42L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 42L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        this.setup();
        this.pipeInput("input-topic", "b", 21L, 9999L);
        MatcherAssert.assertThat((Object)this.store.get((Object)"b"), (Matcher)CoreMatchers.equalTo((Object)21L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "b", 21L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        this.setup();
        this.pipeInput("input-topic", "a", 1L, 9999L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        this.pipeInput("input-topic", "a", 1L, 9999L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
        this.pipeInput("input-topic", "a", 1L, 10000L);
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        this.setup();
        this.testDriver.advanceWallClockTime(Duration.ofMillis(60000L));
        this.compareKeyValue((TestRecord<String, Long>)this.testDriver.readRecord("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), "a", 21L);
        Assert.assertTrue((boolean)this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"aggStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()).withCachingEnabled(), new String[]{"aggregator"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    @Test
    public void shouldCleanUpPersistentStateStoresOnClose() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("storeProcessor", (ProcessorSupplier)new ProcessorSupplier<String, Long>(){

            public Processor<String, Long> get() {
                return new Processor<String, Long>(){
                    private KeyValueStore<String, Long> store;

                    public void init(ProcessorContext context) {
                        this.store = (KeyValueStore)context.getStateStore("storeProcessorStore");
                    }

                    public void process(String key, Long value) {
                        this.store.put((Object)key, (Object)value);
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"storeProcessorStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()), new String[]{"storeProcessor"});
        Properties config = new Properties();
        config.put("application.id", "test-TopologyTestDriver-cleanup");
        config.put("bootstrap.servers", "dummy:1234");
        config.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        config.put("default.key.serde", Serdes.String().getClass().getName());
        config.put("default.value.serde", Serdes.Long().getClass().getName());
        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);){
            Assert.assertNull((Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
            testDriver.pipeRecord("input-topic", new TestRecord((Object)"a", (Object)1L), (Serializer)new StringSerializer(), (Serializer)new LongSerializer(), Instant.now());
            Assert.assertEquals((Object)1L, (Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
        }
        testDriver = new TopologyTestDriver(topology, config);
        var4_4 = null;
        try {
            Assert.assertNull((String)"Closing the prior test driver should have cleaned up this store and value.", (Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (testDriver != null) {
                if (var4_4 != null) {
                    try {
                        testDriver.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    testDriver.close();
                }
            }
        }
    }

    @Test
    public void shouldFeedStoreFromGlobalKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()), Materialized.as((String)"globalStore"));
        try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), this.config);){
            KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore");
            Assert.assertNotNull((Object)globalStore);
            Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
            testDriver.pipeRecord("topic", new TestRecord((Object)"k1", (Object)"value1"), (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.now());
            Assert.assertEquals((Object)"value1", (Object)globalStore.get((Object)"k1"));
        }
    }

    private Topology setupMultipleSourcesPatternTopology(Pattern ... sourceTopicPatternNames) {
        Topology topology = new Topology();
        String[] processorNames = new String[sourceTopicPatternNames.length];
        int i = 0;
        for (Pattern sourceTopicPatternName : sourceTopicPatternNames) {
            String sourceName = sourceTopicPatternName + "-source";
            String processorName = sourceTopicPatternName + "-processor";
            topology.addSource(sourceName, sourceTopicPatternName);
            processorNames[i++] = processorName;
            topology.addProcessor(processorName, (ProcessorSupplier)new MockProcessorSupplier(), new String[]{sourceName});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
        return topology;
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
        Pattern pattern2Source2 = Pattern.compile("source-topic-[A-Z]");
        String consumerTopic2 = "source-topic-Z";
        TestRecord consumerRecord2 = new TestRecord((Object)this.key2, (Object)this.value2, null, Long.valueOf(43L));
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)0L, (long)processedRecords2.size());
        Record record1 = (Record)processedRecords1.get(0);
        Record expectedResult1 = new Record(SOURCE_TOPIC_1, this.testRecord1, 0L);
        MatcherAssert.assertThat((Object)record1, (Matcher)CoreMatchers.equalTo((Object)expectedResult1));
        this.pipeRecord("source-topic-Z", (TestRecord<byte[], byte[]>)consumerRecord2);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)1L, (long)processedRecords2.size());
        Record record2 = (Record)processedRecords2.get(0);
        Record expectedResult2 = new Record("source-topic-Z", (TestRecord<byte[], byte[]>)consumerRecord2, 0L);
        MatcherAssert.assertThat((Object)record2, (Matcher)CoreMatchers.equalTo((Object)expectedResult2));
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        String sourceName = "source";
        Pattern pattern2Source1 = Pattern.compile("source-topic-\\d");
        Topology topology = new Topology();
        topology.addSource("source", pattern2Source1);
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord outputRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
    }

    @Test
    public void shouldThrowPatternNotValidForTopicNameException() {
        String sourceName = "source";
        String pattern2Source1 = "source-topic-\\d";
        Topology topology = new Topology();
        topology.addSource("source", new String[]{"source-topic-\\d"});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        try {
            this.pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        }
        catch (TopologyException exception) {
            String str = String.format("Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for input record topic: %s and hence cannot process the message.", "source-topic-\\d", SOURCE_TOPIC_1);
            Assert.assertEquals((Object)str, (Object)exception.getMessage());
        }
    }

    @Test
    public void shouldNotCreateStateDirectoryForStatelessTopology() {
        this.setup();
        String stateDir = this.config.getProperty("state.dir");
        File appDir = new File(stateDir, this.config.getProperty("application.id"));
        Assert.assertFalse((boolean)appDir.exists());
    }

    @Test
    public void shouldCreateStateDirectoryForStatefulTopology() {
        this.setup(Stores.persistentKeyValueStore((String)"aggStore"));
        String stateDir = this.config.getProperty("state.dir");
        File appDir = new File(stateDir, this.config.getProperty("application.id"));
        Assert.assertTrue((boolean)appDir.exists());
        Assert.assertTrue((boolean)appDir.isDirectory());
        TaskId taskId = new TaskId(0, 0);
        Assert.assertTrue((boolean)new File(appDir, taskId.toString()).exists());
    }

    @Test
    public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "dummy");
        properties.setProperty("bootstrap.servers", "dummy");
        Topology topology = new Topology();
        topology.addSource("source", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input"});
        topology.addProcessor("recursiveProcessor", () -> new AbstractProcessor<String, String>(){

            public void process(String key, String value) {
                if (!value.startsWith("recurse-")) {
                    this.context().forward((Object)key, (Object)("recurse-" + value), To.child((String)"recursiveSink"));
                }
                this.context().forward((Object)key, (Object)value, To.child((String)"sink"));
            }
        }, new String[]{"source"});
        topology.addSink("recursiveSink", "input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("sink", "output", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);){
            TestInputTopic in = topologyTestDriver.createInputTopic("input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic out = topologyTestDriver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            in.pipeInput((Object)"B", (Object)"beta");
            List events = out.readKeyValuesToList();
            MatcherAssert.assertThat((Object)events, (Matcher)CoreMatchers.is(Arrays.asList(new KeyValue((Object)"B", (Object)"beta"), new KeyValue((Object)"B", (Object)"recurse-beta"))));
        }
    }

    @Test
    public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "dummy");
        properties.setProperty("bootstrap.servers", "dummy");
        Topology topology = new Topology();
        topology.addSource("source", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"globule-store"), (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled(), "globuleSource", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), "globule-topic", "globuleProcessor", () -> new Processor<String, String>(){
            private KeyValueStore stateStore;

            public void init(ProcessorContext context) {
                this.stateStore = (KeyValueStore)context.getStateStore("globule-store");
            }

            public void process(String key, String value) {
                this.stateStore.put((Object)key, (Object)value);
            }

            public void close() {
            }
        });
        topology.addProcessor("recursiveProcessor", () -> new AbstractProcessor<String, String>(){

            public void process(String key, String value) {
                if (!value.startsWith("recurse-")) {
                    this.context().forward((Object)key, (Object)("recurse-" + value), To.child((String)"recursiveSink"));
                }
                this.context().forward((Object)key, (Object)value, To.child((String)"sink"));
                this.context().forward((Object)key, (Object)value, To.child((String)"globuleSink"));
            }
        }, new String[]{"source"});
        topology.addSink("recursiveSink", "input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("sink", "output", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("globuleSink", "globule-topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"recursiveProcessor"});
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);){
            TestInputTopic in = topologyTestDriver.createInputTopic("input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic globalTopic = topologyTestDriver.createOutputTopic("globule-topic", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            in.pipeInput((Object)"A", (Object)"alpha");
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("globule-store");
            MatcherAssert.assertThat((Object)keyValueStore, (Matcher)CoreMatchers.notNullValue());
            MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)"recurse-alpha"));
            List events = globalTopic.readKeyValuesToList();
            MatcherAssert.assertThat((Object)events, (Matcher)CoreMatchers.is(Arrays.asList(new KeyValue((Object)"A", (Object)"alpha"), new KeyValue((Object)"A", (Object)"recurse-alpha"))));
        }
    }

    @Test
    public void shouldRespectTaskIdling() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "dummy");
        properties.setProperty("bootstrap.servers", "dummy");
        properties.setProperty("max.task.idle.ms", "1000");
        Topology topology = new Topology();
        topology.addSource("source1", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input1"});
        topology.addSource("source2", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), new String[]{"input2"});
        topology.addSink("sink", "output", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"source1", "source2"});
        try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);){
            TestInputTopic in1 = topologyTestDriver.createInputTopic("input1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic in2 = topologyTestDriver.createInputTopic("input2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic out = topologyTestDriver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            in1.pipeInput((Object)"A", (Object)"alpha");
            topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1L));
            MatcherAssert.assertThat((Object)out.readKeyValuesToList(), (Matcher)CoreMatchers.is(Collections.emptyList()));
            in2.pipeInput((Object)"B", (Object)"beta");
            MatcherAssert.assertThat((Object)out.readKeyValuesToList(), (Matcher)CoreMatchers.is(Collections.singletonList(new KeyValue((Object)"A", (Object)"alpha"))));
            topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(1L));
            MatcherAssert.assertThat((Object)out.readKeyValuesToList(), (Matcher)CoreMatchers.is(Collections.singletonList(new KeyValue((Object)"B", (Object)"beta"))));
        }
    }

    private static class CustomMaxAggregator
    implements Processor<String, Long> {
        ProcessorContext context;
        private KeyValueStore<String, Long> store;

        private CustomMaxAggregator() {
        }

        public void init(ProcessorContext context) {
            this.context = context;
            context.schedule(Duration.ofMinutes(1L), PunctuationType.WALL_CLOCK_TIME, timestamp -> this.flushStore());
            context.schedule(Duration.ofSeconds(10L), PunctuationType.STREAM_TIME, timestamp -> this.flushStore());
            this.store = (KeyValueStore)context.getStateStore("aggStore");
        }

        public void process(String key, Long value) {
            Long oldValue = (Long)this.store.get((Object)key);
            if (oldValue == null || value > oldValue) {
                this.store.put((Object)key, (Object)value);
            }
        }

        private void flushStore() {
            try (KeyValueIterator it = this.store.all();){
                while (it.hasNext()) {
                    KeyValue next = (KeyValue)it.next();
                    this.context.forward(next.key, next.value);
                }
            }
        }

        public void close() {
        }
    }

    private static class CustomMaxAggregatorSupplier
    implements ProcessorSupplier<String, Long> {
        private CustomMaxAggregatorSupplier() {
        }

        public Processor<String, Long> get() {
            return new CustomMaxAggregator();
        }
    }

    private final class MockProcessorSupplier
    implements ProcessorSupplier<Object, Object> {
        private final Collection<Punctuation> punctuations;

        private MockProcessorSupplier() {
            this(Collections.emptySet());
        }

        private MockProcessorSupplier(Collection<Punctuation> punctuations) {
            this.punctuations = punctuations;
        }

        public Processor<Object, Object> get() {
            MockProcessor mockProcessor = new MockProcessor(this.punctuations);
            TopologyTestDriverTest.this.mockProcessors.add(mockProcessor);
            return mockProcessor;
        }
    }

    private static final class MockProcessor
    implements Processor<Object, Object> {
        private final Collection<Punctuation> punctuations;
        private ProcessorContext context;
        private boolean initialized = false;
        private boolean closed = false;
        private final List<Record> processedRecords = new ArrayList<Record>();

        MockProcessor(Collection<Punctuation> punctuations) {
            this.punctuations = punctuations;
        }

        public void init(ProcessorContext context) {
            this.initialized = true;
            this.context = context;
            for (Punctuation punctuation : this.punctuations) {
                this.context.schedule(Duration.ofMillis(punctuation.intervalMs), punctuation.punctuationType, punctuation.callback);
            }
        }

        public void process(Object key, Object value) {
            this.processedRecords.add(new Record(key, value, this.context.headers(), this.context.timestamp(), this.context.offset(), this.context.topic()));
            this.context.forward(key, value);
        }

        public void close() {
            this.closed = true;
        }
    }

    private static final class MockPunctuator
    implements Punctuator {
        private final List<Long> punctuatedAt = new LinkedList<Long>();

        private MockPunctuator() {
        }

        public void punctuate(long timestamp) {
            this.punctuatedAt.add(timestamp);
        }
    }

    private static final class Punctuation {
        private final long intervalMs;
        private final PunctuationType punctuationType;
        private final Punctuator callback;

        Punctuation(long intervalMs, PunctuationType punctuationType, Punctuator callback) {
            this.intervalMs = intervalMs;
            this.punctuationType = punctuationType;
            this.callback = callback;
        }
    }

    private static final class Record {
        private final Object key;
        private final Object value;
        private final long timestamp;
        private final long offset;
        private final String topic;
        private final Headers headers;

        Record(ConsumerRecord<byte[], byte[]> consumerRecord, long newOffset) {
            this.key = consumerRecord.key();
            this.value = consumerRecord.value();
            this.timestamp = consumerRecord.timestamp();
            this.offset = newOffset;
            this.topic = consumerRecord.topic();
            this.headers = consumerRecord.headers();
        }

        Record(String newTopic, TestRecord<byte[], byte[]> consumerRecord, long newOffset) {
            this.key = consumerRecord.key();
            this.value = consumerRecord.value();
            this.timestamp = consumerRecord.timestamp();
            this.offset = newOffset;
            this.topic = newTopic;
            this.headers = consumerRecord.headers();
        }

        Record(Object key, Object value, Headers headers, long timestamp, long offset, String topic) {
            this.key = key;
            this.value = value;
            this.headers = headers;
            this.timestamp = timestamp;
            this.offset = offset;
            this.topic = topic;
        }

        public String toString() {
            return "key: " + this.key + ", value: " + this.value + ", timestamp: " + this.timestamp + ", offset: " + this.offset + ", topic: " + this.topic;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Record record = (Record)o;
            return this.timestamp == record.timestamp && this.offset == record.offset && Objects.equals(this.key, record.key) && Objects.equals(this.value, record.value) && Objects.equals(this.topic, record.topic) && Objects.equals(this.headers, record.headers);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.value, this.headers, this.timestamp, this.offset, this.topic);
        }
    }
}

