package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableFilterTest.class */
public class KTableFilterTest {
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Serde<String> stringSerde = Serdes.String();
    private KStreamTestDriver driver = null;
    private File stateDir = null;

    @After
    public void tearDown() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Before
    public void setUp() throws IOException {
        this.stateDir = TestUtils.tempDirectory("kafka-test");
    }

    @Test
    public void testKTable() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTable table = kStreamBuilder.table(this.stringSerde, this.intSerde, "topic1", "anyStoreName");
        KTable filter = table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.1
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        KTable filterNot = table.filterNot(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.2
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        filter.toStream().process(mockProcessorSupplier, new String[0]);
        filterNot.toStream().process(mockProcessorSupplier2, new String[0]);
        this.driver = new KStreamTestDriver(kStreamBuilder, this.stateDir);
        this.driver.process("topic1", "A", 1);
        this.driver.process("topic1", "B", 2);
        this.driver.process("topic1", "C", 3);
        this.driver.process("topic1", "D", 4);
        this.driver.flushState();
        this.driver.process("topic1", "A", null);
        this.driver.process("topic1", "B", null);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
        mockProcessorSupplier2.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
    }

    @Test
    public void testValueGetter() throws IOException {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTableImpl table = kStreamBuilder.table(this.stringSerde, this.intSerde, "topic1", "anyStoreName");
        KTableImpl filter = table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.3
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        KTableImpl filterNot = table.filterNot(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.4
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        KTableValueGetterSupplier valueGetterSupplier = filter.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = filterNot.valueGetterSupplier();
        this.driver = new KStreamTestDriver(kStreamBuilder, this.stateDir, null, null);
        KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
        KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
        kTableValueGetter.init(this.driver.context());
        kTableValueGetter2.init(this.driver.context());
        this.driver.process("topic1", "A", 1);
        this.driver.process("topic1", "B", 1);
        this.driver.process("topic1", "C", 1);
        Assert.assertNull(kTableValueGetter.get("A"));
        Assert.assertNull(kTableValueGetter.get("B"));
        Assert.assertNull(kTableValueGetter.get("C"));
        Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("A")).intValue());
        Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("B")).intValue());
        Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
        this.driver.process("topic1", "A", 2);
        this.driver.process("topic1", "B", 2);
        Assert.assertEquals(2L, ((Integer) kTableValueGetter.get("A")).intValue());
        Assert.assertEquals(2L, ((Integer) kTableValueGetter.get("B")).intValue());
        Assert.assertNull(kTableValueGetter.get("C"));
        Assert.assertNull(kTableValueGetter2.get("A"));
        Assert.assertNull(kTableValueGetter2.get("B"));
        Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
        this.driver.process("topic1", "A", 3);
        Assert.assertNull(kTableValueGetter.get("A"));
        Assert.assertEquals(2L, ((Integer) kTableValueGetter.get("B")).intValue());
        Assert.assertNull(kTableValueGetter.get("C"));
        Assert.assertEquals(3L, ((Integer) kTableValueGetter2.get("A")).intValue());
        Assert.assertNull(kTableValueGetter2.get("B"));
        Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
        this.driver.process("topic1", "A", null);
        this.driver.process("topic1", "B", null);
        Assert.assertNull(kTableValueGetter.get("A"));
        Assert.assertNull(kTableValueGetter.get("B"));
        Assert.assertNull(kTableValueGetter.get("C"));
        Assert.assertNull(kTableValueGetter2.get("A"));
        Assert.assertNull(kTableValueGetter2.get("B"));
        Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
    }

    @Test
    public void testNotSendingOldValue() throws IOException {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTableImpl table = kStreamBuilder.table(this.stringSerde, this.intSerde, "topic1", "anyStoreName");
        KTableImpl filter = table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.5
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        kStreamBuilder.addProcessor("proc1", mockProcessorSupplier, new String[]{table.name});
        kStreamBuilder.addProcessor("proc2", mockProcessorSupplier2, new String[]{filter.name});
        this.driver = new KStreamTestDriver(kStreamBuilder, this.stateDir, null, null);
        this.driver.process("topic1", "A", 1);
        this.driver.process("topic1", "B", 1);
        this.driver.process("topic1", "C", 1);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
        mockProcessorSupplier2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
        this.driver.process("topic1", "A", 2);
        this.driver.process("topic1", "B", 2);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        mockProcessorSupplier2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        this.driver.process("topic1", "A", 3);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(3<-null)");
        mockProcessorSupplier2.checkAndClearProcessResult("A:(null<-null)");
        this.driver.process("topic1", "A", null);
        this.driver.process("topic1", "B", null);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
        mockProcessorSupplier2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
    }

    @Test
    public void testSendingOldValue() throws IOException {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTableImpl table = kStreamBuilder.table(this.stringSerde, this.intSerde, "topic1", "anyStoreName");
        KTableImpl filter = table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.6
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        filter.enableSendingOldValues();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        kStreamBuilder.addProcessor("proc1", mockProcessorSupplier, new String[]{table.name});
        kStreamBuilder.addProcessor("proc2", mockProcessorSupplier2, new String[]{filter.name});
        this.driver = new KStreamTestDriver(kStreamBuilder, this.stateDir, null, null);
        this.driver.process("topic1", "A", 1);
        this.driver.process("topic1", "B", 1);
        this.driver.process("topic1", "C", 1);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
        mockProcessorSupplier2.checkEmptyAndClearProcessResult();
        this.driver.process("topic1", "A", 2);
        this.driver.process("topic1", "B", 2);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
        mockProcessorSupplier2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        this.driver.process("topic1", "A", 3);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(3<-2)");
        mockProcessorSupplier2.checkAndClearProcessResult("A:(null<-2)");
        this.driver.process("topic1", "A", null);
        this.driver.process("topic1", "B", null);
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
        mockProcessorSupplier2.checkAndClearProcessResult("B:(null<-2)");
    }

    @Test
    public void testSkipNullOnMaterialization() throws IOException {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTableImpl table = kStreamBuilder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTableImpl reduce = table.filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.7
            public boolean test(String str, String str2) {
                return str2.equalsIgnoreCase("accept");
            }
        }).groupBy(MockKeyValueMapper.NoOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        kStreamBuilder.addProcessor("proc1", mockProcessorSupplier, new String[]{table.name});
        kStreamBuilder.addProcessor("proc2", mockProcessorSupplier2, new String[]{reduce.name});
        this.driver = new KStreamTestDriver(kStreamBuilder, this.stateDir, this.stringSerde, this.stringSerde);
        this.driver.process("topic1", "A", "reject");
        this.driver.process("topic1", "B", "reject");
        this.driver.process("topic1", "C", "reject");
        this.driver.flushState();
        mockProcessorSupplier.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
        mockProcessorSupplier2.checkEmptyAndClearProcessResult();
    }
}
