package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSourceTest.class */
public class KTableSourceTest {
    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    @Test
    public void testKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.Integer()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        table.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new IntegerSerializer());
                createInputTopic.pipeInput("A", 1, 10L);
                createInputTopic.pipeInput("B", 2, 11L);
                createInputTopic.pipeInput("C", 3, 12L);
                createInputTopic.pipeInput("D", 4, 13L);
                createInputTopic.pipeInput("A", (Object) null, 14L);
                createInputTopic.pipeInput("B", (Object) null, 15L);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp("A", 1, 10L), new KeyValueTimestamp("B", 2, 11L), new KeyValueTimestamp("C", 3, 12L), new KeyValueTimestamp("D", 4, 13L), new KeyValueTimestamp("A", null, 14L), new KeyValueTimestamp("B", null, 15L)), mockProcessorSupplier.theCapturedProcessor().processed());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @Ignore
    public void testKTableSourceEmitOnChange() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.Integer()), Materialized.as("store")).toStream().to("output");
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new IntegerSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new IntegerDeserializer());
                createInputTopic.pipeInput("A", 1, 10L);
                createInputTopic.pipeInput("B", 2, 11L);
                createInputTopic.pipeInput("A", 1, 12L);
                createInputTopic.pipeInput("B", 3, 13L);
                createInputTopic.pipeInput("A", 1, 9L);
                Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(topologyTestDriver.metrics(), "idempotent-update-skip-total", "stream-processor-node-metrics").metricValue());
                Assert.assertEquals(Arrays.asList(new TestRecord("A", 1, Instant.ofEpochMilli(10L)), new TestRecord("B", 2, Instant.ofEpochMilli(11L)), new TestRecord("B", 3, Instant.ofEpochMilli(13L)), new TestRecord("A", 1, Instant.ofEpochMilli(9L))), createOutputTopic.readRecordsToList());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void kTableShouldLogAndMeterOnSkippedRecords() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", this.stringConsumed);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KTableSource.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput((Object) null, "value");
                    MatcherAssert.assertThat(createAndRegister.getEvents().stream().filter(event -> {
                        return event.getLevel().equals("WARN");
                    }).map((v0) -> {
                        return v0.getMessage();
                    }).collect(Collectors.toList()), CoreMatchers.hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void kTableShouldLogOnOutOfOrder() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", this.stringConsumed, Materialized.as("store"));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KTableSource.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                    createInputTopic.pipeInput("key", "value", 10L);
                    createInputTopic.pipeInput("key", "value", 5L);
                    MatcherAssert.assertThat(createAndRegister.getEvents().stream().filter(event -> {
                        return event.getLevel().equals("WARN");
                    }).map((v0) -> {
                        return v0.getMessage();
                    }).collect(Collectors.toList()), CoreMatchers.hasItem("Detected out-of-order KTable update for store, old timestamp=[10] new timestamp=[5]. topic=[topic] partition=[1] offset=[0]."));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.stringConsumed, Materialized.as("store"));
        Topology build = streamsBuilder.build();
        KTableValueGetterSupplier valueGetterSupplier = table.valueGetterSupplier();
        TopologyWrapper.getInternalTopologyBuilder(build).connectProcessorAndStateStores(table.name, valueGetterSupplier.storeNames());
        TopologyTestDriverWrapper topologyTestDriverWrapper = new TopologyTestDriverWrapper(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriverWrapper.createInputTopic("topic1", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
                kTableValueGetter.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(table.name));
                createInputTopic.pipeInput("A", "01", 10L);
                createInputTopic.pipeInput("B", "01", 20L);
                createInputTopic.pipeInput("C", "01", 15L);
                Assert.assertEquals(ValueAndTimestamp.make("01", 10L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 20L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                createInputTopic.pipeInput("A", "02", 30L);
                createInputTopic.pipeInput("B", "02", 5L);
                Assert.assertEquals(ValueAndTimestamp.make("02", 30L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make("02", 5L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                createInputTopic.pipeInput("A", "03", 29L);
                Assert.assertEquals(ValueAndTimestamp.make("03", 29L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make("02", 5L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                createInputTopic.pipeInput("A", (Object) null, 50L);
                createInputTopic.pipeInput("B", (Object) null, 3L);
                Assert.assertNull(kTableValueGetter.get("A"));
                Assert.assertNull(kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                if (topologyTestDriverWrapper != null) {
                    if (0 == 0) {
                        topologyTestDriverWrapper.close();
                        return;
                    }
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriverWrapper != null) {
                if (th != null) {
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriverWrapper.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.stringConsumed);
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc1", mockApiProcessorSupplier, new String[]{table.name}), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                createInputTopic.pipeInput("A", "01", 10L);
                createInputTopic.pipeInput("B", "01", 20L);
                createInputTopic.pipeInput("C", "01", 15L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("01", (Object) null), 10L), new KeyValueTimestamp<>("B", new Change("01", (Object) null), 20L), new KeyValueTimestamp<>("C", new Change("01", (Object) null), 15L));
                createInputTopic.pipeInput("A", "02", 8L);
                createInputTopic.pipeInput("B", "02", 22L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("02", (Object) null), 8L), new KeyValueTimestamp<>("B", new Change("02", (Object) null), 22L));
                createInputTopic.pipeInput("A", "03", 12L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("03", (Object) null), 12L));
                createInputTopic.pipeInput("A", (Object) null, 15L);
                createInputTopic.pipeInput("B", (Object) null, 20L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 15L), new KeyValueTimestamp<>("B", new Change((Object) null, (Object) null), 20L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.stringConsumed);
        table.enableSendingOldValues(true);
        Assert.assertTrue(table.sendingOldValueEnabled());
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc1", mockApiProcessorSupplier, new String[]{table.name}), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                createInputTopic.pipeInput("A", "01", 10L);
                createInputTopic.pipeInput("B", "01", 20L);
                createInputTopic.pipeInput("C", "01", 15L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("01", (Object) null), 10L), new KeyValueTimestamp<>("B", new Change("01", (Object) null), 20L), new KeyValueTimestamp<>("C", new Change("01", (Object) null), 15L));
                createInputTopic.pipeInput("A", "02", 8L);
                createInputTopic.pipeInput("B", "02", 22L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("02", "01"), 8L), new KeyValueTimestamp<>("B", new Change("02", "01"), 22L));
                createInputTopic.pipeInput("A", "03", 12L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("03", "02"), 12L));
                createInputTopic.pipeInput("A", (Object) null, 15L);
                createInputTopic.pipeInput("B", (Object) null, 20L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, "03"), 15L), new KeyValueTimestamp<>("B", new Change((Object) null, "02"), 20L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
