package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.class */
public class KTableMapValuesTest {
    private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    private void doTestKTable(StreamsBuilder streamsBuilder, String str, MockApiProcessorSupplier<String, Integer, Void, Void> mockApiProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                createInputTopic.pipeInput("A", "1", 5L);
                createInputTopic.pipeInput("B", "2", 25L);
                createInputTopic.pipeInput("C", "3", 20L);
                createInputTopic.pipeInput("D", "4", 10L);
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", 1, 5L), new KeyValueTimestamp("B", 2, 25L), new KeyValueTimestamp("C", 3, 20L), new KeyValueTimestamp("D", 4, 10L)), mockApiProcessorSupplier.theCapturedProcessor().processed());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable mapValues = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed).mapValues(str -> {
            return Integer.valueOf(str.charAt(0) - '0');
        });
        MockApiProcessorSupplier<String, Integer, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        mapValues.toStream().process(mockApiProcessorSupplier, new String[0]);
        doTestKTable(streamsBuilder, AssignmentTestUtils.TP_1_NAME, mockApiProcessorSupplier);
    }

    @Test
    public void testQueryableKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable mapValues = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed).mapValues(str -> {
            return Integer.valueOf(str.charAt(0) - '0');
        }, Materialized.as("anyName").withValueSerde(Serdes.Integer()));
        MockApiProcessorSupplier<String, Integer, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        mapValues.toStream().process(mockApiProcessorSupplier, new String[0]);
        doTestKTable(streamsBuilder, AssignmentTestUtils.TP_1_NAME, mockApiProcessorSupplier);
    }

    private void doTestValueGetter(StreamsBuilder streamsBuilder, String str, KTableImpl<String, String, Integer> kTableImpl, KTableImpl<String, String, Integer> kTableImpl2) {
        Topology build = streamsBuilder.build();
        KTableValueGetterSupplier valueGetterSupplier = kTableImpl.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = kTableImpl2.valueGetterSupplier();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(build);
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl.name, valueGetterSupplier.storeNames());
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl2.name, valueGetterSupplier2.storeNames());
        TopologyTestDriverWrapper topologyTestDriverWrapper = new TopologyTestDriverWrapper(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriverWrapper.createInputTopic(str, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
                KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
                kTableValueGetter.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl.name));
                kTableValueGetter2.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl2.name));
                createInputTopic.pipeInput("A", "01", 50L);
                createInputTopic.pipeInput("B", "01", 10L);
                createInputTopic.pipeInput("C", "01", 30L);
                Assert.assertEquals(ValueAndTimestamp.make(1, 50L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(1, 10L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(1, 30L), kTableValueGetter.get("C"));
                Assert.assertEquals(ValueAndTimestamp.make(-1, 50L), kTableValueGetter2.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(-1, 10L), kTableValueGetter2.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(-1, 30L), kTableValueGetter2.get("C"));
                createInputTopic.pipeInput("A", "02", 25L);
                createInputTopic.pipeInput("B", "02", 20L);
                Assert.assertEquals(ValueAndTimestamp.make(2, 25L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(2, 20L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(1, 30L), kTableValueGetter.get("C"));
                Assert.assertEquals(ValueAndTimestamp.make(-2, 25L), kTableValueGetter2.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(-2, 20L), kTableValueGetter2.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(-1, 30L), kTableValueGetter2.get("C"));
                createInputTopic.pipeInput("A", "03", 35L);
                Assert.assertEquals(ValueAndTimestamp.make(3, 35L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(2, 20L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(1, 30L), kTableValueGetter.get("C"));
                Assert.assertEquals(ValueAndTimestamp.make(-3, 35L), kTableValueGetter2.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(-2, 20L), kTableValueGetter2.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(-1, 30L), kTableValueGetter2.get("C"));
                createInputTopic.pipeInput("A", (String) null, 1L);
                Assert.assertNull(kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(2, 20L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(1, 30L), kTableValueGetter.get("C"));
                Assert.assertNull(kTableValueGetter2.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make(-2, 20L), kTableValueGetter2.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make(-1, 30L), kTableValueGetter2.get("C"));
                if (topologyTestDriverWrapper != null) {
                    if (0 == 0) {
                        topologyTestDriverWrapper.close();
                        return;
                    }
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriverWrapper != null) {
                if (th != null) {
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriverWrapper.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testQueryableValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, String, Integer> kTableImpl = (KTableImpl) table.mapValues(str -> {
            return Integer.valueOf(str);
        }, Materialized.as("store2").withValueSerde(Serdes.Integer()));
        KTableImpl<String, String, Integer> kTableImpl2 = (KTableImpl) table.mapValues(str2 -> {
            return Integer.valueOf(Integer.valueOf(str2).intValue() * (-1));
        }, Materialized.as("store3").withValueSerde(Serdes.Integer()));
        KTableImpl mapValues = table.mapValues(str3 -> {
            return Integer.valueOf(str3);
        });
        Assert.assertEquals("store2", kTableImpl.queryableStoreName());
        Assert.assertEquals("store3", kTableImpl2.queryableStoreName());
        Assert.assertNull(mapValues.queryableStoreName());
        doTestValueGetter(streamsBuilder, AssignmentTestUtils.TP_1_NAME, kTableImpl, kTableImpl2);
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl mapValues = table.mapValues(str -> {
            return Integer.valueOf(str);
        });
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc", mockApiProcessorSupplier, new String[]{mapValues.name}), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            Assert.assertFalse(table.sendingOldValueEnabled());
            Assert.assertFalse(mapValues.sendingOldValueEnabled());
            createInputTopic.pipeInput("A", "01", 5L);
            createInputTopic.pipeInput("B", "01", 10L);
            createInputTopic.pipeInput("C", "01", 15L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(1, (Object) null), 5L), new KeyValueTimestamp<>("B", new Change(1, (Object) null), 10L), new KeyValueTimestamp<>("C", new Change(1, (Object) null), 15L));
            createInputTopic.pipeInput("A", "02", 10L);
            createInputTopic.pipeInput("B", "02", 8L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(2, (Object) null), 10L), new KeyValueTimestamp<>("B", new Change(2, (Object) null), 8L));
            createInputTopic.pipeInput("A", "03", 20L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(3, (Object) null), 20L));
            createInputTopic.pipeInput("A", (String) null, 30L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 30L));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldEnableSendingOldValuesOnParentIfMapValuesNotMaterialized() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, String, Integer> kTableImpl = (KTableImpl) table.mapValues(str -> {
            return Integer.valueOf(str);
        });
        kTableImpl.enableSendingOldValues(true);
        MatcherAssert.assertThat(Boolean.valueOf(table.sendingOldValueEnabled()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl.sendingOldValueEnabled()), Matchers.is(true));
        testSendingOldValues(streamsBuilder, AssignmentTestUtils.TP_1_NAME, kTableImpl);
    }

    @Test
    public void shouldNotEnableSendingOldValuesOnParentIfMapValuesMaterialized() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, String, Integer> kTableImpl = (KTableImpl) table.mapValues(str -> {
            return Integer.valueOf(str);
        }, Materialized.as("bob").withValueSerde(Serdes.Integer()));
        kTableImpl.enableSendingOldValues(true);
        MatcherAssert.assertThat(Boolean.valueOf(table.sendingOldValueEnabled()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl.sendingOldValueEnabled()), Matchers.is(true));
        testSendingOldValues(streamsBuilder, AssignmentTestUtils.TP_1_NAME, kTableImpl);
    }

    private void testSendingOldValues(StreamsBuilder streamsBuilder, String str, KTableImpl<String, String, Integer> kTableImpl) {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.build().addProcessor("proc", mockApiProcessorSupplier, new String[]{kTableImpl.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                createInputTopic.pipeInput("A", "01", 5L);
                createInputTopic.pipeInput("B", "01", 10L);
                createInputTopic.pipeInput("C", "01", 15L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(1, (Object) null), 5L), new KeyValueTimestamp<>("B", new Change(1, (Object) null), 10L), new KeyValueTimestamp<>("C", new Change(1, (Object) null), 15L));
                createInputTopic.pipeInput("A", "02", 10L);
                createInputTopic.pipeInput("B", "02", 8L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(2, 1), 10L), new KeyValueTimestamp<>("B", new Change(2, 1), 8L));
                createInputTopic.pipeInput("A", "03", 20L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(3, 2), 20L));
                createInputTopic.pipeInput("A", (String) null, 30L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, 3), 30L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
