package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.test.KStreamTestDriver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamPeekTest.class */
public class KStreamPeekTest {
    private final String topicName = "topic";
    private final Serde<Integer> intSerd = Serdes.Integer();
    private final Serde<String> stringSerd = Serdes.String();
    private KStreamTestDriver driver = null;

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
    }

    @Test
    public void shouldObserveStreamElements() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(this.intSerd, this.stringSerd, new String[]{"topic"});
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        stream.peek(collect(arrayList)).foreach(collect(arrayList2));
        this.driver = new KStreamTestDriver(kStreamBuilder);
        ArrayList arrayList3 = new ArrayList();
        for (int i = 0; i < 32; i++) {
            String str = "V" + i;
            this.driver.process("topic", Integer.valueOf(i), str);
            arrayList3.add(new KeyValue(Integer.valueOf(i), str));
        }
        Assert.assertEquals(arrayList3, arrayList);
        Assert.assertEquals(arrayList3, arrayList2);
    }

    @Test
    public void shouldNotAllowNullAction() {
        try {
            new KStreamBuilder().stream(this.intSerd, this.stringSerd, new String[]{"topic"}).peek((ForeachAction) null);
            Assert.fail("expected null action to throw NPE");
        } catch (NullPointerException e) {
        }
    }

    private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> list) {
        return new ForeachAction<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamPeekTest.1
            public void apply(K k, V v) {
                list.add(new KeyValue(k, v));
            }
        };
    }
}
