package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.class */
public class KTableKTableInnerJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<Integer, String> consumed = Consumed.with(this.intSerde, this.stringSerde);
    private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.with(this.intSerde, this.stringSerde);
    private File stateDir = null;

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory("kafka-test");
    }

    @Test
    public void testJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier<Integer, String> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable<Integer, String> join = streamsBuilder.table("topic1", this.consumed).join(streamsBuilder.table("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER);
        join.toStream().process(mockProcessorSupplier, new String[0]);
        doTestJoin(streamsBuilder, new int[]{0, 1, 2, 3}, mockProcessorSupplier, join);
    }

    @Test
    public void testQueryableJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier<Integer, String> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable<Integer, String> join = streamsBuilder.table("topic1", this.consumed).join(streamsBuilder.table("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, this.materialized);
        join.toStream().process(mockProcessorSupplier, new String[0]);
        doTestJoin(streamsBuilder, new int[]{0, 1, 2, 3}, mockProcessorSupplier, join);
    }

    @Test
    public void testQueryableNotSendingOldValues() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier<Integer, String> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable<Integer, String> table = streamsBuilder.table("topic1", this.consumed);
        KTable<Integer, String> table2 = streamsBuilder.table("topic2", this.consumed);
        KTableImpl join = table.join(table2, MockValueJoiner.TOSTRING_JOINER, this.materialized);
        streamsBuilder.build().addProcessor("proc", mockProcessorSupplier, new String[]{join.name});
        doTestNotSendingOldValues(streamsBuilder, new int[]{0, 1, 2, 3}, table, table2, mockProcessorSupplier, join);
    }

    @Test
    public void testNotSendingOldValues() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier<Integer, String> mockProcessorSupplier = new MockProcessorSupplier<>();
        KTable<Integer, String> table = streamsBuilder.table("topic1", this.consumed);
        KTable<Integer, String> table2 = streamsBuilder.table("topic2", this.consumed);
        KTableImpl join = table.join(table2, MockValueJoiner.TOSTRING_JOINER);
        streamsBuilder.build().addProcessor("proc", mockProcessorSupplier, new String[]{join.name});
        doTestNotSendingOldValues(streamsBuilder, new int[]{0, 1, 2, 3}, table, table2, mockProcessorSupplier, join);
    }

    @Test
    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Processor processor = new KTableKTableInnerJoin(streamsBuilder.table("left", Consumed.with(this.stringSerde, this.stringSerde)), streamsBuilder.table("right", Consumed.with(this.stringSerde, this.stringSerde)), (ValueJoiner) null).get();
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        mockProcessorContext.setRecordMetadata("left", -1, -2L, (Headers) null, -3L);
        processor.init(mockProcessorContext);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        processor.process((Object) null, new Change("new", "old"));
        LogCaptureAppender.unregister(createAndRegister);
        Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(mockProcessorContext.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
        MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
    }

    private void doTestNotSendingOldValues(StreamsBuilder streamsBuilder, int[] iArr, KTable<Integer, String> kTable, KTable<Integer, String> kTable2, MockProcessorSupplier<Integer, String> mockProcessorSupplier, KTable<Integer, String> kTable3) {
        Assert.assertFalse(((KTableImpl) kTable).sendingOldValueEnabled());
        Assert.assertFalse(((KTableImpl) kTable2).sendingOldValueEnabled());
        Assert.assertFalse(((KTableImpl) kTable3).sendingOldValueEnabled());
        this.driver.setUp(streamsBuilder, this.stateDir, Serdes.Integer(), Serdes.String());
        this.driver.setTime(0L);
        MockProcessor<Integer, String> theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
        for (int i = 0; i < 2; i++) {
            this.driver.process("topic1", Integer.valueOf(iArr[i]), "X" + iArr[i]);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult(new String[0]);
        for (int i2 = 0; i2 < 2; i2++) {
            this.driver.process("topic2", Integer.valueOf(iArr[i2]), "Y" + iArr[i2]);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
        for (int i3 : iArr) {
            this.driver.process("topic1", Integer.valueOf(i3), "XX" + i3);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)");
        for (int i4 : iArr) {
            this.driver.process("topic2", Integer.valueOf(i4), "YY" + i4);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
        for (int i5 : iArr) {
            this.driver.process("topic1", Integer.valueOf(i5), "X" + i5);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
        for (int i6 = 0; i6 < 2; i6++) {
            this.driver.process("topic2", Integer.valueOf(iArr[i6]), null);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
        for (int i7 : iArr) {
            this.driver.process("topic1", Integer.valueOf(i7), "XX" + i7);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
    }

    private void doTestJoin(StreamsBuilder streamsBuilder, int[] iArr, MockProcessorSupplier<Integer, String> mockProcessorSupplier, KTable<Integer, String> kTable) {
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        KTableValueGetterSupplier valueGetterSupplier = ((KTableImpl) kTable).valueGetterSupplier();
        this.driver.setUp(streamsBuilder, this.stateDir, Serdes.Integer(), Serdes.String());
        this.driver.setTime(0L);
        MockProcessor<Integer, String> theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
        KTableValueGetter<Integer, String> kTableValueGetter = valueGetterSupplier.get();
        kTableValueGetter.init(this.driver.context());
        for (int i = 0; i < 2; i++) {
            this.driver.process("topic1", Integer.valueOf(iArr[i]), "X" + iArr[i]);
        }
        this.driver.process("topic1", null, "SomeVal");
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult(new String[0]);
        for (int i2 = 0; i2 < 2; i2++) {
            this.driver.process("topic2", Integer.valueOf(iArr[i2]), "Y" + iArr[i2]);
        }
        this.driver.process("topic2", null, "AnotherVal");
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
        checkJoinedValues(kTableValueGetter, kv(0, "X0+Y0"), kv(1, "X1+Y1"));
        for (int i3 : iArr) {
            this.driver.process("topic1", Integer.valueOf(i3), "XX" + i3);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
        checkJoinedValues(kTableValueGetter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));
        for (int i4 : iArr) {
            this.driver.process("topic2", Integer.valueOf(i4), "YY" + i4);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
        checkJoinedValues(kTableValueGetter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
        for (int i5 : iArr) {
            this.driver.process("topic1", Integer.valueOf(i5), "X" + i5);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
        checkJoinedValues(kTableValueGetter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
        for (int i6 = 0; i6 < 2; i6++) {
            this.driver.process("topic2", Integer.valueOf(iArr[i6]), null);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("0:null", "1:null");
        checkJoinedValues(kTableValueGetter, kv(0, null), kv(1, null));
        for (int i7 : iArr) {
            this.driver.process("topic1", Integer.valueOf(i7), "XX" + i7);
        }
        this.driver.flushState();
        theCapturedProcessor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
        checkJoinedValues(kTableValueGetter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
        this.driver.process("topic1", null, "XX1");
        checkJoinedValues(kTableValueGetter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
    }

    private KeyValue<Integer, String> kv(Integer num, String str) {
        return new KeyValue<>(num, str);
    }

    @SafeVarargs
    private final void checkJoinedValues(KTableValueGetter<Integer, String> kTableValueGetter, KeyValue<Integer, String>... keyValueArr) {
        for (KeyValue<Integer, String> keyValue : keyValueArr) {
            String str = (String) kTableValueGetter.get(keyValue.key);
            if (keyValue.value == null) {
                Assert.assertNull(str);
            } else {
                Assert.assertEquals(keyValue.value, str);
            }
        }
    }
}
