package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.class */
public class KTableKTableOuterJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final String output = "output";
    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());

    @Test
    public void testJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        streamsBuilder.table("topic1", this.consumed).outerJoin(streamsBuilder.table("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER).toStream().to("output");
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestOutputTopic<Integer, String> createOutputTopic = topologyTestDriver.createOutputTopic("output", Serdes.Integer().deserializer(), Serdes.String().deserializer());
                for (int i = 0; i < 2; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "X" + iArr[i], 5 + i);
                }
                createInputTopic.pipeInput((Object) null, "SomeVal", 42L);
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "X0+null", 5L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "X1+null", 6L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                for (int i2 = 0; i2 < 2; i2++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "Y" + iArr[i2], 10 * i2);
                }
                createInputTopic2.pipeInput((Object) null, "AnotherVal", 73L);
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "X0+Y0", 5L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "X1+Y1", 10L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                for (int i3 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i3), "XX" + i3, 7L);
                }
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "XX0+Y0", 7L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "XX1+Y1", 10L);
                assertOutputKeyValueTimestamp(createOutputTopic, 2, "XX2+null", 7L);
                assertOutputKeyValueTimestamp(createOutputTopic, 3, "XX3+null", 7L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "YY" + i4, i4 * 5);
                }
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "XX0+YY0", 7L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "XX1+YY1", 7L);
                assertOutputKeyValueTimestamp(createOutputTopic, 2, "XX2+YY2", 10L);
                assertOutputKeyValueTimestamp(createOutputTopic, 3, "XX3+YY3", 15L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                for (int i5 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i5), "XXX" + i5, 6L);
                }
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "XXX0+YY0", 6L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "XXX1+YY1", 6L);
                assertOutputKeyValueTimestamp(createOutputTopic, 2, "XXX2+YY2", 10L);
                assertOutputKeyValueTimestamp(createOutputTopic, 3, "XXX3+YY3", 15L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                createInputTopic2.pipeInput(Integer.valueOf(iArr[0]), (Object) null, 5L);
                createInputTopic2.pipeInput(Integer.valueOf(iArr[1]), (Object) null, 7L);
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "XXX0+null", 6L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "XXX1+null", 7L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                for (int i6 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i6), "XXXX" + i6, 13L);
                }
                assertOutputKeyValueTimestamp(createOutputTopic, 0, "XXXX0+null", 13L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, "XXXX1+null", 13L);
                assertOutputKeyValueTimestamp(createOutputTopic, 2, "XXXX2+YY2", 13L);
                assertOutputKeyValueTimestamp(createOutputTopic, 3, "XXXX3+YY3", 15L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                createInputTopic.pipeInput(Integer.valueOf(iArr[0]), (Object) null, 0L);
                createInputTopic.pipeInput(Integer.valueOf(iArr[1]), (Object) null, 42L);
                createInputTopic.pipeInput(Integer.valueOf(iArr[2]), (Object) null, 5L);
                createInputTopic.pipeInput(Integer.valueOf(iArr[3]), (Object) null, 20L);
                assertOutputKeyValueTimestamp(createOutputTopic, 0, null, 0L);
                assertOutputKeyValueTimestamp(createOutputTopic, 1, null, 42L);
                assertOutputKeyValueTimestamp(createOutputTopic, 2, "null+YY2", 10L);
                assertOutputKeyValueTimestamp(createOutputTopic, 3, "null+YY3", 20L);
                Assert.assertTrue(createOutputTopic.isEmpty());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl table2 = streamsBuilder.table("topic2", this.consumed);
        KTableImpl outerJoin = table.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc", mockProcessorSupplier, new String[]{outerJoin.name}), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
            Assert.assertTrue(table.sendingOldValueEnabled());
            Assert.assertTrue(table2.sendingOldValueEnabled());
            Assert.assertFalse(outerJoin.sendingOldValueEnabled());
            for (int i = 0; i < 2; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "X" + iArr[i], 5 + i);
            }
            createInputTopic.pipeInput((Object) null, "SomeVal", 42L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("X0+null", (Object) null), 5L), new KeyValueTimestamp(1, new Change("X1+null", (Object) null), 6L));
            for (int i2 = 0; i2 < 2; i2++) {
                createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "Y" + iArr[i2], 10 * i2);
            }
            createInputTopic2.pipeInput((Object) null, "AnotherVal", 73L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("X0+Y0", (Object) null), 5L), new KeyValueTimestamp(1, new Change("X1+Y1", (Object) null), 10L));
            for (int i3 : iArr) {
                createInputTopic.pipeInput(Integer.valueOf(i3), "XX" + i3, 7L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XX0+Y0", (Object) null), 7L), new KeyValueTimestamp(1, new Change("XX1+Y1", (Object) null), 10L), new KeyValueTimestamp(2, new Change("XX2+null", (Object) null), 7L), new KeyValueTimestamp(3, new Change("XX3+null", (Object) null), 7L));
            for (int i4 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i4), "YY" + i4, i4 * 5);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XX0+YY0", (Object) null), 7L), new KeyValueTimestamp(1, new Change("XX1+YY1", (Object) null), 7L), new KeyValueTimestamp(2, new Change("XX2+YY2", (Object) null), 10L), new KeyValueTimestamp(3, new Change("XX3+YY3", (Object) null), 15L));
            for (int i5 : iArr) {
                createInputTopic.pipeInput(Integer.valueOf(i5), "XXX" + i5, 6L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XXX0+YY0", (Object) null), 6L), new KeyValueTimestamp(1, new Change("XXX1+YY1", (Object) null), 6L), new KeyValueTimestamp(2, new Change("XXX2+YY2", (Object) null), 10L), new KeyValueTimestamp(3, new Change("XXX3+YY3", (Object) null), 15L));
            createInputTopic2.pipeInput(Integer.valueOf(iArr[0]), (Object) null, 5L);
            createInputTopic2.pipeInput(Integer.valueOf(iArr[1]), (Object) null, 7L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XXX0+null", (Object) null), 6L), new KeyValueTimestamp(1, new Change("XXX1+null", (Object) null), 7L));
            for (int i6 : iArr) {
                createInputTopic.pipeInput(Integer.valueOf(i6), "XXXX" + i6, 13L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XXXX0+null", (Object) null), 13L), new KeyValueTimestamp(1, new Change("XXXX1+null", (Object) null), 13L), new KeyValueTimestamp(2, new Change("XXXX2+YY2", (Object) null), 13L), new KeyValueTimestamp(3, new Change("XXXX3+YY3", (Object) null), 15L));
            createInputTopic.pipeInput(Integer.valueOf(iArr[0]), (Object) null, 0L);
            createInputTopic.pipeInput(Integer.valueOf(iArr[1]), (Object) null, 42L);
            createInputTopic.pipeInput(Integer.valueOf(iArr[2]), (Object) null, 5L);
            createInputTopic.pipeInput(Integer.valueOf(iArr[3]), (Object) null, 20L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change((Object) null, (Object) null), 0L), new KeyValueTimestamp(1, new Change((Object) null, (Object) null), 42L), new KeyValueTimestamp(2, new Change("null+YY2", (Object) null), 10L), new KeyValueTimestamp(3, new Change("null+YY3", (Object) null), 20L));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl table2 = streamsBuilder.table("topic2", this.consumed);
        KTableImpl outerJoin = table.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
        outerJoin.enableSendingOldValues();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc", mockProcessorSupplier, new String[]{outerJoin.name}), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic1", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic2", Serdes.Integer().serializer(), Serdes.String().serializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                Assert.assertTrue(table.sendingOldValueEnabled());
                Assert.assertTrue(table2.sendingOldValueEnabled());
                Assert.assertTrue(outerJoin.sendingOldValueEnabled());
                for (int i = 0; i < 2; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "X" + iArr[i], 5 + i);
                }
                createInputTopic.pipeInput((Object) null, "SomeVal", 42L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("X0+null", (Object) null), 5L), new KeyValueTimestamp(1, new Change("X1+null", (Object) null), 6L));
                for (int i2 = 0; i2 < 2; i2++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "Y" + iArr[i2], 10 * i2);
                }
                createInputTopic2.pipeInput((Object) null, "AnotherVal", 73L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("X0+Y0", "X0+null"), 5L), new KeyValueTimestamp(1, new Change("X1+Y1", "X1+null"), 10L));
                for (int i3 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i3), "XX" + i3, 7L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XX0+Y0", "X0+Y0"), 7L), new KeyValueTimestamp(1, new Change("XX1+Y1", "X1+Y1"), 10L), new KeyValueTimestamp(2, new Change("XX2+null", (Object) null), 7L), new KeyValueTimestamp(3, new Change("XX3+null", (Object) null), 7L));
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "YY" + i4, i4 * 5);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XX0+YY0", "XX0+Y0"), 7L), new KeyValueTimestamp(1, new Change("XX1+YY1", "XX1+Y1"), 7L), new KeyValueTimestamp(2, new Change("XX2+YY2", "XX2+null"), 10L), new KeyValueTimestamp(3, new Change("XX3+YY3", "XX3+null"), 15L));
                for (int i5 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i5), "XXX" + i5, 6L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XXX0+YY0", "XX0+YY0"), 6L), new KeyValueTimestamp(1, new Change("XXX1+YY1", "XX1+YY1"), 6L), new KeyValueTimestamp(2, new Change("XXX2+YY2", "XX2+YY2"), 10L), new KeyValueTimestamp(3, new Change("XXX3+YY3", "XX3+YY3"), 15L));
                createInputTopic2.pipeInput(Integer.valueOf(iArr[0]), (Object) null, 5L);
                createInputTopic2.pipeInput(Integer.valueOf(iArr[1]), (Object) null, 7L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XXX0+null", "XXX0+YY0"), 6L), new KeyValueTimestamp(1, new Change("XXX1+null", "XXX1+YY1"), 7L));
                for (int i6 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i6), "XXXX" + i6, 13L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change("XXXX0+null", "XXX0+null"), 13L), new KeyValueTimestamp(1, new Change("XXXX1+null", "XXX1+null"), 13L), new KeyValueTimestamp(2, new Change("XXXX2+YY2", "XXX2+YY2"), 13L), new KeyValueTimestamp(3, new Change("XXXX3+YY3", "XXX3+YY3"), 15L));
                createInputTopic.pipeInput(Integer.valueOf(iArr[0]), (Object) null, 0L);
                createInputTopic.pipeInput(Integer.valueOf(iArr[1]), (Object) null, 42L);
                createInputTopic.pipeInput(Integer.valueOf(iArr[2]), (Object) null, 5L);
                createInputTopic.pipeInput(Integer.valueOf(iArr[3]), (Object) null, 20L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp(0, new Change((Object) null, "XXXX0+null"), 0L), new KeyValueTimestamp(1, new Change((Object) null, "XXXX1+null"), 42L), new KeyValueTimestamp(2, new Change("null+YY2", "XXXX2+YY2"), 10L), new KeyValueTimestamp(3, new Change("null+YY3", "XXXX3+YY3"), 20L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Processor processor = new KTableKTableOuterJoin(streamsBuilder.table("left", Consumed.with(Serdes.String(), Serdes.String())), streamsBuilder.table("right", Consumed.with(Serdes.String(), Serdes.String())), (ValueJoiner) null).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        mockProcessorContext.setRecordMetadata("left", -1, -2L, (Headers) null, -3L);
        processor.init(mockProcessorContext);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        processor.process((Object) null, new Change("new", "old"));
        LogCaptureAppender.unregister(createAndRegister);
        Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(mockProcessorContext.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
        MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<Integer, String> testOutputTopic, Integer num, String str, long j) {
        MatcherAssert.assertThat(testOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord(num, str, (Headers) null, Long.valueOf(j))));
    }
}
