package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.class */
public class KTableKTableJoinTest {
    private String topic1 = "topic1";
    private String topic2 = "topic2";
    private IntegerSerializer keySerializer = new IntegerSerializer();
    private StringSerializer valSerializer = new StringSerializer();
    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
    private StringDeserializer valDeserializer = new StringDeserializer();
    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableKTableJoinTest.1
        public String apply(String str, String str2) {
            return str + "+" + str2;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest$JoinedKeyValue.class */
    public static class JoinedKeyValue extends KeyValue<Integer, String> {
        public JoinedKeyValue(Integer num, String str) {
            super(num, str);
        }
    }

    @Test
    public void testJoin() throws Exception {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            int[] iArr = {0, 1, 2, 3};
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            KTableImpl join = kStreamBuilder.table(this.keySerializer, this.valSerializer, this.keyDeserializer, this.valDeserializer, this.topic1).join(kStreamBuilder.table(this.keySerializer, this.valSerializer, this.keyDeserializer, this.valDeserializer, this.topic2), this.joiner);
            join.toStream().process(mockProcessorSupplier, new String[0]);
            Collection copartitionGroups = kStreamBuilder.copartitionGroups();
            Assert.assertEquals(1L, copartitionGroups.size());
            Assert.assertEquals(new HashSet(Arrays.asList(this.topic1, this.topic2)), copartitionGroups.iterator().next());
            KTableValueGetterSupplier valueGetterSupplier = join.valueGetterSupplier();
            KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder, file);
            kStreamTestDriver.setTime(0L);
            KTableValueGetter<Integer, String> kTableValueGetter = valueGetterSupplier.get();
            kTableValueGetter.init(kStreamTestDriver.context());
            for (int i = 0; i < 2; i++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i]), "X" + iArr[i]);
            }
            mockProcessorSupplier.checkAndClearResult("0:null", "1:null");
            checkJoinedValues(kTableValueGetter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
            for (int i2 = 0; i2 < 2; i2++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i2]), "Y" + iArr[i2]);
            }
            mockProcessorSupplier.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
            checkJoinedValues(kTableValueGetter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
            for (int i3 = 0; i3 < iArr.length; i3++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i3]), "X" + iArr[i3]);
            }
            mockProcessorSupplier.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
            checkJoinedValues(kTableValueGetter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
            for (int i4 = 0; i4 < iArr.length; i4++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i4]), "YY" + iArr[i4]);
            }
            mockProcessorSupplier.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            checkJoinedValues(kTableValueGetter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
            for (int i5 = 0; i5 < iArr.length; i5++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i5]), "X" + iArr[i5]);
            }
            mockProcessorSupplier.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            checkJoinedValues(kTableValueGetter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
            for (int i6 = 0; i6 < 2; i6++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i6]), null);
            }
            mockProcessorSupplier.checkAndClearResult("0:null", "1:null");
            checkJoinedValues(kTableValueGetter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
            for (int i7 = 0; i7 < iArr.length; i7++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i7]), "XX" + iArr[i7]);
            }
            mockProcessorSupplier.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
            checkJoinedValues(kTableValueGetter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }

    @Test
    public void testNotSendingOldValues() throws Exception {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            int[] iArr = {0, 1, 2, 3};
            KTableImpl table = kStreamBuilder.table(this.keySerializer, this.valSerializer, this.keyDeserializer, this.valDeserializer, this.topic1);
            KTableImpl table2 = kStreamBuilder.table(this.keySerializer, this.valSerializer, this.keyDeserializer, this.valDeserializer, this.topic2);
            KTableImpl join = table.join(table2, this.joiner);
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            kStreamBuilder.addProcessor("proc", mockProcessorSupplier, new String[]{join.name});
            KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder, file);
            kStreamTestDriver.setTime(0L);
            Assert.assertFalse(table.sendingOldValueEnabled());
            Assert.assertFalse(table2.sendingOldValueEnabled());
            Assert.assertFalse(join.sendingOldValueEnabled());
            for (int i = 0; i < 2; i++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i]), "X" + iArr[i]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
            for (int i2 = 0; i2 < 2; i2++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i2]), "Y" + iArr[i2]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
            for (int i3 = 0; i3 < iArr.length; i3++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i3]), "X" + iArr[i3]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
            for (int i4 = 0; i4 < iArr.length; i4++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i4]), "YY" + iArr[i4]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
            for (int i5 = 0; i5 < iArr.length; i5++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i5]), "X" + iArr[i5]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
            for (int i6 = 0; i6 < 2; i6++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i6]), null);
            }
            mockProcessorSupplier.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
            for (int i7 = 0; i7 < iArr.length; i7++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i7]), "XX" + iArr[i7]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }

    @Test
    public void testSendingOldValues() throws Exception {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            int[] iArr = {0, 1, 2, 3};
            KTableImpl table = kStreamBuilder.table(this.keySerializer, this.valSerializer, this.keyDeserializer, this.valDeserializer, this.topic1);
            KTableImpl table2 = kStreamBuilder.table(this.keySerializer, this.valSerializer, this.keyDeserializer, this.valDeserializer, this.topic2);
            KTableImpl join = table.join(table2, this.joiner);
            join.enableSendingOldValues();
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            kStreamBuilder.addProcessor("proc", mockProcessorSupplier, new String[]{join.name});
            KStreamTestDriver kStreamTestDriver = new KStreamTestDriver(kStreamBuilder, file);
            kStreamTestDriver.setTime(0L);
            Assert.assertTrue(table.sendingOldValueEnabled());
            Assert.assertTrue(table2.sendingOldValueEnabled());
            Assert.assertTrue(join.sendingOldValueEnabled());
            for (int i = 0; i < 2; i++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i]), "X" + iArr[i]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
            for (int i2 = 0; i2 < 2; i2++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i2]), "Y" + iArr[i2]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
            for (int i3 = 0; i3 < iArr.length; i3++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i3]), "X" + iArr[i3]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
            for (int i4 = 0; i4 < iArr.length; i4++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i4]), "YY" + iArr[i4]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
            for (int i5 = 0; i5 < iArr.length; i5++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i5]), "X" + iArr[i5]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
            for (int i6 = 0; i6 < 2; i6++) {
                kStreamTestDriver.process(this.topic2, Integer.valueOf(iArr[i6]), null);
            }
            mockProcessorSupplier.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
            for (int i7 = 0; i7 < iArr.length; i7++) {
                kStreamTestDriver.process(this.topic1, Integer.valueOf(iArr[i7]), "XX" + iArr[i7]);
            }
            mockProcessorSupplier.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }

    private JoinedKeyValue kv(Integer num, String str) {
        return new JoinedKeyValue(num, str);
    }

    private void checkJoinedValues(KTableValueGetter<Integer, String> kTableValueGetter, JoinedKeyValue... joinedKeyValueArr) {
        for (JoinedKeyValue joinedKeyValue : joinedKeyValueArr) {
            String str = (String) kTableValueGetter.get(joinedKeyValue.key);
            if (joinedKeyValue.value == null) {
                Assert.assertNull(str);
            } else {
                Assert.assertEquals(joinedKeyValue.value, str);
            }
        }
    }
}
