package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.class */
public class KTableMapValuesTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private File stateDir = null;

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory("kafka-test");
    }

    private void doTestKTable(StreamsBuilder streamsBuilder, String str, MockProcessorSupplier<String, Integer> mockProcessorSupplier) {
        this.driver.setUp(streamsBuilder, this.stateDir, Serdes.String(), Serdes.String());
        this.driver.process(str, "A", "1");
        this.driver.process(str, "B", "2");
        this.driver.process(str, "C", "3");
        this.driver.process(str, "D", "4");
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"A:1", "B:2", "C:3", "D:4"}), mockProcessorSupplier.processed);
    }

    @Test
    public void testKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable mapValues = streamsBuilder.table("topic1", this.consumed).mapValues(new ValueMapper<CharSequence, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.1
            public Integer apply(CharSequence charSequence) {
                return Integer.valueOf(charSequence.charAt(0) - '0');
            }
        });
        MockProcessorSupplier<String, Integer> mockProcessorSupplier = new MockProcessorSupplier<>();
        mapValues.toStream().process(mockProcessorSupplier, new String[0]);
        doTestKTable(streamsBuilder, "topic1", mockProcessorSupplier);
    }

    @Test
    public void testQueryableKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable mapValues = streamsBuilder.table("topic1", this.consumed).mapValues(new ValueMapper<CharSequence, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.2
            public Integer apply(CharSequence charSequence) {
                return Integer.valueOf(charSequence.charAt(0) - '0');
            }
        }, Materialized.as("anyName").withValueSerde(Serdes.Integer()));
        MockProcessorSupplier<String, Integer> mockProcessorSupplier = new MockProcessorSupplier<>();
        mapValues.toStream().process(mockProcessorSupplier, new String[0]);
        doTestKTable(streamsBuilder, "topic1", mockProcessorSupplier);
    }

    private void doTestValueGetter(StreamsBuilder streamsBuilder, String str, KTableImpl<String, String, String> kTableImpl, KTableImpl<String, String, Integer> kTableImpl2, KTableImpl<String, Integer, Integer> kTableImpl3, KTableImpl<String, String, String> kTableImpl4) {
        KTableValueGetterSupplier valueGetterSupplier = kTableImpl.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = kTableImpl2.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier3 = kTableImpl3.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier4 = kTableImpl4.valueGetterSupplier();
        this.driver.setUp(streamsBuilder, this.stateDir, Serdes.String(), Serdes.String());
        KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
        kTableValueGetter.init(this.driver.context());
        KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
        kTableValueGetter2.init(this.driver.context());
        KTableValueGetter kTableValueGetter3 = valueGetterSupplier3.get();
        kTableValueGetter3.init(this.driver.context());
        KTableValueGetter kTableValueGetter4 = valueGetterSupplier4.get();
        kTableValueGetter4.init(this.driver.context());
        this.driver.process(str, "A", "01");
        this.driver.process(str, "B", "01");
        this.driver.process(str, "C", "01");
        this.driver.flushState();
        Assert.assertEquals("01", kTableValueGetter.get("A"));
        Assert.assertEquals("01", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertNull(kTableValueGetter3.get("A"));
        Assert.assertNull(kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertEquals("01", kTableValueGetter4.get("A"));
        Assert.assertEquals("01", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
        this.driver.process(str, "A", "02");
        this.driver.process(str, "B", "02");
        this.driver.flushState();
        Assert.assertEquals("02", kTableValueGetter.get("A"));
        Assert.assertEquals("02", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertEquals("02", kTableValueGetter4.get("A"));
        Assert.assertEquals("02", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
        this.driver.process(str, "A", "03");
        this.driver.flushState();
        Assert.assertEquals("03", kTableValueGetter.get("A"));
        Assert.assertEquals("02", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertEquals(new Integer(3), kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertNull(kTableValueGetter3.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertEquals("03", kTableValueGetter4.get("A"));
        Assert.assertEquals("02", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
        this.driver.process(str, "A", null);
        this.driver.flushState();
        Assert.assertNull(kTableValueGetter.get("A"));
        Assert.assertEquals("02", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertNull(kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertNull(kTableValueGetter3.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertNull(kTableValueGetter4.get("A"));
        Assert.assertEquals("02", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        KTableImpl<String, String, Integer> kTableImpl2 = (KTableImpl) kTableImpl.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.3
            public Integer apply(String str) {
                return new Integer(str);
            }
        });
        doTestValueGetter(streamsBuilder, "topic1", kTableImpl, kTableImpl2, (KTableImpl) kTableImpl2.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.4
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), (KTableImpl) kTableImpl.through(this.stringSerde, this.stringSerde, "topic2", "storeName2"));
    }

    @Test
    public void testQueryableValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        KTableImpl<String, String, Integer> kTableImpl2 = (KTableImpl) kTableImpl.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.5
            public Integer apply(String str) {
                return new Integer(str);
            }
        }, Materialized.as("anyMapName").withValueSerde(Serdes.Integer()));
        doTestValueGetter(streamsBuilder, "topic1", kTableImpl, kTableImpl2, (KTableImpl) kTableImpl2.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.6
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, Materialized.as("anyFilterName").withValueSerde(Serdes.Integer())), (KTableImpl) kTableImpl.through(this.stringSerde, this.stringSerde, "topic2", "storeName2"));
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl mapValues = table.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.7
            public Integer apply(String str) {
                return new Integer(str);
            }
        });
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.build().addProcessor("proc", mockProcessorSupplier, new String[]{mapValues.name});
        this.driver.setUp(streamsBuilder, this.stateDir);
        Assert.assertFalse(table.sendingOldValueEnabled());
        Assert.assertFalse(mapValues.sendingOldValueEnabled());
        this.driver.process("topic1", "A", "01");
        this.driver.process("topic1", "B", "01");
        this.driver.process("topic1", "C", "01");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
        this.driver.process("topic1", "A", "02");
        this.driver.process("topic1", "B", "02");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        this.driver.process("topic1", "A", "03");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(3<-null)");
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(null<-null)");
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl mapValues = table.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableMapValuesTest.8
            public Integer apply(String str) {
                return new Integer(str);
            }
        });
        mapValues.enableSendingOldValues();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.build().addProcessor("proc", mockProcessorSupplier, new String[]{mapValues.name});
        this.driver.setUp(streamsBuilder, this.stateDir);
        Assert.assertTrue(table.sendingOldValueEnabled());
        Assert.assertTrue(mapValues.sendingOldValueEnabled());
        this.driver.process("topic1", "A", "01");
        this.driver.process("topic1", "B", "01");
        this.driver.process("topic1", "C", "01");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
        this.driver.process("topic1", "A", "02");
        this.driver.process("topic1", "B", "02");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
        this.driver.process("topic1", "A", "03");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(3<-2)");
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(null<-3)");
    }
}
