package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.class */
public class ProcessorTopologyTest {
    private static final String INPUT_TOPIC_1 = "input-topic-1";
    private static final String INPUT_TOPIC_2 = "input-topic-2";
    private static final String OUTPUT_TOPIC_1 = "output-topic-1";
    private static final String OUTPUT_TOPIC_2 = "output-topic-2";
    private static final String THROUGH_TOPIC_1 = "through-topic-1";
    private TopologyTestDriver driver;
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static final Header HEADER = new RecordHeader("key", "value".getBytes());
    private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER});
    private final TopologyWrapper topology = new TopologyWrapper();
    private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0);
    private final Properties props = new Properties();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$AddHeaderProcessor.class */
    public static class AddHeaderProcessor extends AbstractProcessor<String, String> {
        protected AddHeaderProcessor() {
        }

        public void process(String str, String str2) {
            context().headers().add(ProcessorTopologyTest.HEADER);
            context().forward(str, str2);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$CustomTimestampExtractor.class */
    public static class CustomTimestampExtractor implements TimestampExtractor {
        private static final long DEFAULT_TIMESTAMP = 1000;

        public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
            return consumerRecord.value().toString().matches(".*@[0-9]+") ? Long.parseLong(consumerRecord.value().toString().split("@")[1]) : consumerRecord.timestamp() >= 0 ? consumerRecord.timestamp() : DEFAULT_TIMESTAMP;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$ForwardingProcessor.class */
    public static class ForwardingProcessor extends AbstractProcessor<String, String> {
        protected ForwardingProcessor() {
        }

        public void process(String str, String str2) {
            context().forward(str, str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$MultiplexByNameProcessor.class */
    public static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
        private final int numChildren;

        MultiplexByNameProcessor(int i) {
            this.numChildren = i;
        }

        public void process(String str, String str2) {
            for (int i = 0; i != this.numChildren; i++) {
                context().forward(str, str2 + "(" + (i + 1) + ")", "sink" + i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$MultiplexingProcessor.class */
    public static class MultiplexingProcessor extends AbstractProcessor<String, String> {
        private final int numChildren;

        MultiplexingProcessor(int i) {
            this.numChildren = i;
        }

        public void process(String str, String str2) {
            for (int i = 0; i != this.numChildren; i++) {
                context().forward(str, str2 + "(" + (i + 1) + ")", i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$StatefulProcessor.class */
    public static class StatefulProcessor extends AbstractProcessor<String, String> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        StatefulProcessor(String str) {
            this.storeName = str;
        }

        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.store = processorContext.getStateStore(this.storeName);
        }

        public void process(String str, String str2) {
            this.store.put(str, str2);
        }

        public void close() {
            this.store.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$TimestampProcessor.class */
    public static class TimestampProcessor extends AbstractProcessor<String, String> {
        protected TimestampProcessor() {
        }

        public void process(String str, String str2) {
            context().forward(str, str2, To.all().withTimestamp(context().timestamp() + 10));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$ValueTimestampProcessor.class */
    public static class ValueTimestampProcessor extends AbstractProcessor<String, String> {
        protected ValueTimestampProcessor() {
        }

        public void process(String str, String str2) {
            context().forward(str, str2.split("@")[0]);
        }
    }

    @Before
    public void setup() {
        File tempDirectory = TestUtils.tempDirectory();
        this.props.setProperty("application.id", "processor-topology-test");
        this.props.setProperty("bootstrap.servers", "localhost:9091");
        this.props.setProperty("state.dir", tempDirectory.getAbsolutePath());
        this.props.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.props.setProperty("default.value.serde", Serdes.String().getClass().getName());
        this.props.setProperty("default.timestamp.extractor", CustomTimestampExtractor.class.getName());
    }

    @After
    public void cleanup() {
        this.props.clear();
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test
    public void testTopologyMetadata() {
        this.topology.setApplicationId("X");
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2", "topic-3"});
        this.topology.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        this.topology.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        this.topology.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        this.topology.addSink("sink-2", "topic-4", new String[]{"processor-1", "processor-2"});
        ProcessorTopology build = this.topology.getInternalBuilder().build();
        Assert.assertEquals(6L, build.processors().size());
        Assert.assertEquals(2L, build.sources().size());
        Assert.assertEquals(3L, build.sourceTopics().size());
        Assert.assertNotNull(build.source("topic-1"));
        Assert.assertNotNull(build.source("topic-2"));
        Assert.assertNotNull(build.source("topic-3"));
        Assert.assertEquals(build.source("topic-2"), build.source("topic-3"));
    }

    @Test
    public void testDrivingSimpleTopology() {
        this.driver = new TopologyTestDriver(createSimpleTopology(10), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", 10);
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", 10);
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", 10);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", 10);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5", 10);
    }

    @Test
    public void testDrivingMultiplexingTopology() {
        this.driver = new TopologyTestDriver(createMultiplexingTopology(), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
    }

    @Test
    public void testDrivingMultiplexByNameTopology() {
        this.driver = new TopologyTestDriver(createMultiplexByNameTopology(), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
    }

    @Test
    public void testDrivingStatefulTopology() {
        this.driver = new TopologyTestDriver(createStatefulTopology("entries"), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value4"));
        assertNoOutputRecord(OUTPUT_TOPIC_1);
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("entries");
        Assert.assertEquals("value4", keyValueStore.get("key1"));
        Assert.assertEquals("value2", keyValueStore.get("key2"));
        Assert.assertEquals("value3", keyValueStore.get("key3"));
        Assert.assertNull(keyValueStore.get("key4"));
    }

    @Test
    public void shouldDriveGlobalStore() {
        this.topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor", define(new StatefulProcessor("my-store")));
        this.driver = new TopologyTestDriver(this.topology, this.props);
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("my-store");
        this.driver.pipeInput(this.recordFactory.create("topic", "key1", "value1"));
        this.driver.pipeInput(this.recordFactory.create("topic", "key2", "value2"));
        Assert.assertEquals("value1", keyValueStore.get("key1"));
        Assert.assertEquals("value2", keyValueStore.get("key2"));
    }

    @Test
    public void testDrivingSimpleMultiSourceTopology() {
        this.driver = new TopologyTestDriver(createSimpleMultiSourceTopology(10), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", 10);
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_2, "key2", "value2"));
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", 10);
        assertNoOutputRecord(OUTPUT_TOPIC_1);
    }

    @Test
    public void testDrivingForwardToSourceTopology() {
        this.driver = new TopologyTestDriver(createForwardToSourceTopology(), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningTopology() {
        this.driver = new TopologyTestDriver(createInternalRepartitioningTopology(), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
        this.driver = new TopologyTestDriver(createInternalRepartitioningWithValueTimestampTopology(), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000"));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000"));
        MatcherAssert.assertThat(this.driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), CoreMatchers.equalTo(new ProducerRecord(OUTPUT_TOPIC_1, (Integer) null, 1000L, "key1", "value1")));
        MatcherAssert.assertThat(this.driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), CoreMatchers.equalTo(new ProducerRecord(OUTPUT_TOPIC_1, (Integer) null, 2000L, "key2", "value2")));
        MatcherAssert.assertThat(this.driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), CoreMatchers.equalTo(new ProducerRecord(OUTPUT_TOPIC_1, (Integer) null, 3000L, "key3", "value3")));
    }

    @Test
    public void shouldCreateStringWithSourceAndTopics() {
        this.topology.addSource("source", new String[]{"topic1", "topic2"});
        MatcherAssert.assertThat(this.topology.getInternalBuilder().build().toString(), CoreMatchers.containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
    }

    @Test
    public void shouldCreateStringWithMultipleSourcesAndTopics() {
        this.topology.addSource("source", new String[]{"topic1", "topic2"});
        this.topology.addSource("source2", new String[]{"t", "t1", "t2"});
        String processorTopology = this.topology.getInternalBuilder().build().toString();
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("source2:\n\t\ttopics:\t\t[t, t1, t2]\n"));
    }

    @Test
    public void shouldCreateStringWithProcessors() {
        this.topology.addSource("source", new String[]{"t"}).addProcessor("processor", this.mockProcessorSupplier, new String[]{"source"}).addProcessor("other", this.mockProcessorSupplier, new String[]{"source"});
        String processorTopology = this.topology.getInternalBuilder().build().toString();
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("\t\tchildren:\t[processor, other]"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("processor:\n"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("other:\n"));
    }

    @Test
    public void shouldRecursivelyPrintChildren() {
        this.topology.addSource("source", new String[]{"t"}).addProcessor("processor", this.mockProcessorSupplier, new String[]{"source"}).addProcessor("child-one", this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-one-one", this.mockProcessorSupplier, new String[]{"child-one"}).addProcessor("child-two", this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-two-one", this.mockProcessorSupplier, new String[]{"child-two"});
        String processorTopology = this.topology.getInternalBuilder().build().toString();
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("child-one:\n\t\tchildren:\t[child-one-one]"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
    }

    @Test
    public void shouldConsiderTimeStamps() {
        this.driver = new TopologyTestDriver(createSimpleTopology(10), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", (Integer) 10, (Long) 10L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", (Integer) 10, (Long) 20L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", (Integer) 10, (Long) 30L);
    }

    @Test
    public void shouldConsiderModifiedTimeStamps() {
        this.driver = new TopologyTestDriver(createTimestampTopology(10), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", (Integer) 10, (Long) 20L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", (Integer) 10, (Long) 30L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", (Integer) 10, (Long) 40L);
    }

    @Test
    public void shouldConsiderHeaders() {
        this.driver = new TopologyTestDriver(createSimpleTopology(10), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1", HEADERS, 10L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2", HEADERS, 20L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3", HEADERS, 30L));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, 10, 10L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, 10, 20L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 10, 30L);
    }

    @Test
    public void shouldAddHeaders() {
        this.driver = new TopologyTestDriver(createAddHeaderTopology(), this.props);
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
        this.driver.pipeInput(this.recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", HEADERS, (Long) 10L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", HEADERS, (Long) 20L);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, (Long) 30L);
    }

    private void assertNextOutputRecord(String str, String str2, String str3) {
        assertNextOutputRecord(str, str2, str3, (Integer) null, (Long) 0L);
    }

    private void assertNextOutputRecord(String str, String str2, String str3, Integer num) {
        assertNextOutputRecord(str, str2, str3, num, (Long) 0L);
    }

    private void assertNextOutputRecord(String str, String str2, String str3, Headers headers, Long l) {
        assertNextOutputRecord(str, str2, str3, headers, null, l);
    }

    private void assertNextOutputRecord(String str, String str2, String str3, Integer num, Long l) {
        assertNextOutputRecord(str, str2, str3, new RecordHeaders(), num, l);
    }

    private void assertNextOutputRecord(String str, String str2, String str3, Headers headers, Integer num, Long l) {
        ProducerRecord readOutput = this.driver.readOutput(str, STRING_DESERIALIZER, STRING_DESERIALIZER);
        Assert.assertEquals(str, readOutput.topic());
        Assert.assertEquals(str2, readOutput.key());
        Assert.assertEquals(str3, readOutput.value());
        Assert.assertEquals(num, readOutput.partition());
        Assert.assertEquals(l, readOutput.timestamp());
        Assert.assertEquals(headers, readOutput.headers());
    }

    private void assertNoOutputRecord(String str) {
        Assert.assertNull(this.driver.readOutput(str));
    }

    private StreamPartitioner<Object, Object> constantPartitioner(final Integer num) {
        return new StreamPartitioner<Object, Object>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.1
            public Integer partition(String str, Object obj, Object obj2, int i) {
                return num;
            }
        };
    }

    private Topology createSimpleTopology(int i) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new ForwardingProcessor()), new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor"});
    }

    private Topology createTimestampTopology(int i) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new TimestampProcessor()), new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor"});
    }

    private Topology createMultiplexingTopology() {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new MultiplexingProcessor(2)), new String[]{"source"}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"processor"}).addSink("sink2", OUTPUT_TOPIC_2, new String[]{"processor"});
    }

    private Topology createMultiplexByNameTopology() {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new MultiplexByNameProcessor(2)), new String[]{"source"}).addSink("sink0", OUTPUT_TOPIC_1, new String[]{"processor"}).addSink("sink1", OUTPUT_TOPIC_2, new String[]{"processor"});
    }

    private Topology createStatefulTopology(String str) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new StatefulProcessor(str)), new String[]{"source"}).addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(str), Serdes.String(), Serdes.String()), new String[]{"processor"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    private Topology createInternalRepartitioningTopology() {
        this.topology.addSource("source", new String[]{INPUT_TOPIC_1}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"source"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
        TopologyWrapper.getInternalTopologyBuilder(this.topology).addInternalTopic(THROUGH_TOPIC_1);
        return this.topology;
    }

    private Topology createInternalRepartitioningWithValueTimestampTopology() {
        this.topology.addSource("source", new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new ValueTimestampProcessor()), new String[]{"source"}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"processor"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
        TopologyWrapper.getInternalTopologyBuilder(this.topology).addInternalTopic(THROUGH_TOPIC_1);
        return this.topology;
    }

    private Topology createForwardToSourceTopology() {
        return this.topology.addSource("source-1", new String[]{INPUT_TOPIC_1}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"source-1"}).addSource("source-2", new String[]{OUTPUT_TOPIC_1}).addSink("sink-2", OUTPUT_TOPIC_2, new String[]{"source-2"});
    }

    private Topology createSimpleMultiSourceTopology(int i) {
        return this.topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", define(new ForwardingProcessor()), new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor-1"}).addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor-2", define(new ForwardingProcessor()), new String[]{"source-2"}).addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(Integer.valueOf(i)), new String[]{"processor-2"});
    }

    private Topology createAddHeaderTopology() {
        return this.topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", define(new AddHeaderProcessor()), new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"processor-1"});
    }

    private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
        return new ProcessorSupplier<K, V>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.2
            public Processor<K, V> get() {
                return processor;
            }
        };
    }
}
