package org.apache.kafka.streams.kstream.internals;

import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableFilterTest.class */
public class KTableFilterTest {
    private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
    private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());

    private void doTestKTable(StreamsBuilder streamsBuilder, KTable<String, Integer> kTable, KTable<String, Integer> kTable2, String str) {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        kTable.toStream().process(mockProcessorSupplier, new String[0]);
        kTable2.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 1));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 2));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "C", 3));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "D", 4));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", (Object) null));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", (Object) null));
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
            ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", this.consumed);
        doTestKTable(streamsBuilder, table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.1
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), table.filterNot(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.2
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), "topic1");
    }

    @Test
    public void testQueryableKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", this.consumed);
        KTable<String, Integer> filter = table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.3
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, Materialized.as("anyStoreNameFilter"));
        KTable<String, Integer> filterNot = table.filterNot(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.4
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        Assert.assertEquals("anyStoreNameFilter", filter.queryableStoreName());
        Assert.assertNull(filterNot.queryableStoreName());
        doTestKTable(streamsBuilder, filter, filterNot, "topic1");
    }

    private void doTestValueGetter(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        Topology build = streamsBuilder.build();
        KTableValueGetterSupplier valueGetterSupplier = kTableImpl.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = kTableImpl2.valueGetterSupplier();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(build);
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl.name, valueGetterSupplier.storeNames());
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl2.name, valueGetterSupplier2.storeNames());
        TopologyTestDriverWrapper topologyTestDriverWrapper = new TopologyTestDriverWrapper(build, this.props);
        Throwable th = null;
        try {
            try {
                KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
                KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
                kTableValueGetter.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl.name));
                kTableValueGetter2.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl2.name));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", 1));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "B", 1));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "C", 1));
                Assert.assertNull(kTableValueGetter.get("A"));
                Assert.assertNull(kTableValueGetter.get("B"));
                Assert.assertNull(kTableValueGetter.get("C"));
                Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("A")).intValue());
                Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("B")).intValue());
                Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", 2));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "B", 2));
                Assert.assertEquals(2L, ((Integer) kTableValueGetter.get("A")).intValue());
                Assert.assertEquals(2L, ((Integer) kTableValueGetter.get("B")).intValue());
                Assert.assertNull(kTableValueGetter.get("C"));
                Assert.assertNull(kTableValueGetter2.get("A"));
                Assert.assertNull(kTableValueGetter2.get("B"));
                Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", 3));
                Assert.assertNull(kTableValueGetter.get("A"));
                Assert.assertEquals(2L, ((Integer) kTableValueGetter.get("B")).intValue());
                Assert.assertNull(kTableValueGetter.get("C"));
                Assert.assertEquals(3L, ((Integer) kTableValueGetter2.get("A")).intValue());
                Assert.assertNull(kTableValueGetter2.get("B"));
                Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", (Object) null));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "B", (Object) null));
                Assert.assertNull(kTableValueGetter.get("A"));
                Assert.assertNull(kTableValueGetter.get("B"));
                Assert.assertNull(kTableValueGetter.get("C"));
                Assert.assertNull(kTableValueGetter2.get("A"));
                Assert.assertNull(kTableValueGetter2.get("B"));
                Assert.assertEquals(1L, ((Integer) kTableValueGetter2.get("C")).intValue());
                if (topologyTestDriverWrapper != null) {
                    if (0 == 0) {
                        topologyTestDriverWrapper.close();
                        return;
                    }
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriverWrapper != null) {
                if (th != null) {
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriverWrapper.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        doTestValueGetter(streamsBuilder, (KTableImpl) table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.5
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), (KTableImpl) table.filterNot(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.6
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), "topic1");
    }

    @Test
    public void testQueryableValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) table.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.7
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, Materialized.as("anyStoreNameFilter"));
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) table.filterNot(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.8
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        Assert.assertEquals("anyStoreNameFilter", kTableImpl.queryableStoreName());
        Assert.assertNull(kTableImpl2.queryableStoreName());
        doTestValueGetter(streamsBuilder, kTableImpl, kTableImpl2, "topic1");
    }

    private void doTestNotSendingOldValue(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.build().addProcessor("proc1", mockProcessorSupplier, new String[]{kTableImpl.name});
        streamsBuilder.build().addProcessor("proc2", mockProcessorSupplier, new String[]{kTableImpl2.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 1));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 1));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "C", 1));
                List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 2));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 2));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 3));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(3<-null)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-null)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", (Object) null));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", (Object) null));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.9
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), "topic1");
    }

    @Test
    public void testQueryableNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.10
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, Materialized.as("anyStoreNameFilter")), "topic1");
    }

    private void doTestSendingOldValue(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        kTableImpl2.enableSendingOldValues();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        Topology build = streamsBuilder.build();
        build.addProcessor("proc1", mockProcessorSupplier, new String[]{kTableImpl.name});
        build.addProcessor("proc2", mockProcessorSupplier, new String[]{kTableImpl2.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 1));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 1));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "C", 1));
                List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
                ((MockProcessor) capturedProcessors.get(1)).checkEmptyAndClearProcessResult();
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 2));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 2));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 3));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(3<-2)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-2)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", (Object) null));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", (Object) null));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("B:(null<-2)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.11
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), "topic1");
    }

    @Test
    public void testQueryableSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.12
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, Materialized.as("anyStoreNameFilter")), "topic1");
    }

    private void doTestSkipNullOnMaterialization(StreamsBuilder streamsBuilder, KTableImpl<String, String, String> kTableImpl, KTableImpl<String, String, String> kTableImpl2, String str) {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        Topology build = streamsBuilder.build();
        build.addProcessor("proc1", mockProcessorSupplier, new String[]{kTableImpl.name});
        build.addProcessor("proc2", mockProcessorSupplier, new String[]{kTableImpl2.name});
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory(new StringSerializer(), new StringSerializer());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(consumerRecordFactory.create(str, "A", "reject"));
            topologyTestDriver.pipeInput(consumerRecordFactory.create(str, "B", "reject"));
            topologyTestDriver.pipeInput(consumerRecordFactory.create(str, "C", "reject"));
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
            ((MockProcessor) capturedProcessors.get(1)).checkEmptyAndClearProcessResult();
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSkipNullOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.String()));
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.13
            public boolean test(String str, String str2) {
                return str2.equalsIgnoreCase("accept");
            }
        }).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER), "topic1");
    }

    @Test
    public void testQueryableSkipNullOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.String()));
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.14
            public boolean test(String str, String str2) {
                return str2.equalsIgnoreCase("accept");
            }
        }, Materialized.as("anyStoreNameFilter")).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as("mock-result")), "topic1");
    }

    @Test
    public void testTypeVariance() {
        Predicate<Number, Object> predicate = new Predicate<Number, Object>() { // from class: org.apache.kafka.streams.kstream.internals.KTableFilterTest.15
            public boolean test(Number number, Object obj) {
                return false;
            }
        };
        new StreamsBuilder().table("empty").filter(predicate).filterNot(predicate).toStream().to("nirvana");
    }
}
