package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.class */
public class KTableKTableJoinIntegrationTest {
    private static final String TABLE_1 = "table1";
    private static final String TABLE_2 = "table2";
    private static final String TABLE_3 = "table3";
    private static final String OUTPUT = "output-";
    private static Properties streamsConfig;
    private KafkaStreams streams;

    @Parameterized.Parameter(0)
    public JoinType joinType1;

    @Parameterized.Parameter(NUM_BROKERS)
    public JoinType joinType2;

    @Parameterized.Parameter(2)
    public List<KeyValue<String, String>> expectedResult;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final MockTime MOCK_TIME = CLUSTER.time;
    private static final Properties CONSUMER_CONFIG = new Properties();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$streams$integration$KTableKTableJoinIntegrationTest$JoinType = new int[JoinType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$streams$integration$KTableKTableJoinIntegrationTest$JoinType[JoinType.INNER.ordinal()] = KTableKTableJoinIntegrationTest.NUM_BROKERS;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$integration$KTableKTableJoinIntegrationTest$JoinType[JoinType.LEFT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$integration$KTableKTableJoinIntegrationTest$JoinType[JoinType.OUTER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest$JoinType.class */
    public enum JoinType {
        INNER,
        LEFT,
        OUTER
    }

    @Parameterized.Parameters
    public static Object[] parameters() {
        return new Object[]{new Object[]{JoinType.INNER, JoinType.INNER, Arrays.asList(new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("c", (Object) null), new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", (Object) null))}, new Object[]{JoinType.INNER, JoinType.LEFT, Arrays.asList(new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("c", (Object) null), new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", (Object) null))}, new Object[]{JoinType.INNER, JoinType.OUTER, Arrays.asList(new KeyValue("a", "null-A3"), new KeyValue("b", "null-B3"), new KeyValue("c", "null-C3"), new KeyValue("a", "null-A3"), new KeyValue("b", "null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", "null-C3"))}, new Object[]{JoinType.LEFT, JoinType.INNER, Arrays.asList(new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("c", (Object) null), new KeyValue("a", "A1-null-A3"), new KeyValue("b", "B1-null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", (Object) null))}, new Object[]{JoinType.LEFT, JoinType.LEFT, Arrays.asList(new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("c", (Object) null), new KeyValue("a", "A1-null-A3"), new KeyValue("b", "B1-null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", (Object) null))}, new Object[]{JoinType.LEFT, JoinType.OUTER, Arrays.asList(new KeyValue("a", "null-A3"), new KeyValue("b", "null-B3"), new KeyValue("c", "null-C3"), new KeyValue("a", "A1-null-A3"), new KeyValue("b", "B1-null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", "null-C3"))}, new Object[]{JoinType.OUTER, JoinType.INNER, Arrays.asList(new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("c", (Object) null), new KeyValue("a", "A1-null-A3"), new KeyValue("b", "B1-null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", "null-C2-C3"))}, new Object[]{JoinType.OUTER, JoinType.LEFT, Arrays.asList(new KeyValue("a", (Object) null), new KeyValue("b", (Object) null), new KeyValue("c", (Object) null), new KeyValue("a", "A1-null-A3"), new KeyValue("b", "B1-null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", "null-C2-C3"))}, new Object[]{JoinType.OUTER, JoinType.OUTER, Arrays.asList(new KeyValue("a", "null-A3"), new KeyValue("b", "null-B3"), new KeyValue("c", "null-C3"), new KeyValue("a", "A1-null-A3"), new KeyValue("b", "B1-null-B3"), new KeyValue("b", "B1-B2-B3"), new KeyValue("c", "null-C2-C3"))}};
    }

    public static Object[] data() {
        return new Object[]{0, 10485760L};
    }

    @BeforeClass
    public static void beforeTest() throws Exception {
        CLUSTER.createTopic(TABLE_1);
        CLUSTER.createTopic(TABLE_2);
        CLUSTER.createTopic(TABLE_3);
        CLUSTER.createTopic(OUTPUT);
        streamsConfig = new Properties();
        streamsConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfig.put("zookeeper.connect", CLUSTER.zKConnectString());
        streamsConfig.put("key.serde", Serdes.String().getClass().getName());
        streamsConfig.put("value.serde", Serdes.String().getClass().getName());
        streamsConfig.put("auto.offset.reset", "earliest");
        streamsConfig.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfig.put("commit.interval.ms", Integer.valueOf(NUM_BROKERS));
        streamsConfig.put("cache.max.bytes.buffering", 0);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        List asList = Arrays.asList(new KeyValue("a", "A1"), new KeyValue("b", "B1"));
        List asList2 = Arrays.asList(new KeyValue("b", "B2"), new KeyValue("c", "C2"));
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, Arrays.asList(new KeyValue("a", "A3"), new KeyValue("b", "B3"), new KeyValue("c", "C3")), properties, MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, asList, properties, MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, asList2, properties, MOCK_TIME);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @Before
    public void before() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
    }

    @After
    public void after() throws Exception {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
    }

    private KafkaStreams prepareTopology() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KTable<String, String> table = kStreamBuilder.table(TABLE_1, TABLE_1);
        KTable<String, String> table2 = kStreamBuilder.table(TABLE_2, TABLE_2);
        join(join(table, table2, this.joinType1), kStreamBuilder.table(TABLE_3, TABLE_3), this.joinType2).to(OUTPUT);
        return new KafkaStreams(kStreamBuilder, new StreamsConfig(streamsConfig));
    }

    private KTable<String, String> join(KTable<String, String> kTable, KTable<String, String> kTable2, JoinType joinType) {
        ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() { // from class: org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest.1
            public String apply(String str, String str2) {
                return str + "-" + str2;
            }
        };
        switch (AnonymousClass2.$SwitchMap$org$apache$kafka$streams$integration$KTableKTableJoinIntegrationTest$JoinType[joinType.ordinal()]) {
            case NUM_BROKERS /* 1 */:
                return kTable.join(kTable2, valueJoiner);
            case 2:
                return kTable.leftJoin(kTable2, valueJoiner);
            case 3:
                return kTable.outerJoin(kTable2, valueJoiner);
            default:
                throw new RuntimeException("Unknown join type.");
        }
    }

    @Test
    public void KTableKTableJoin() throws Exception {
        System.out.println("join: " + this.joinType1 + "-" + this.joinType2);
        streamsConfig.put("application.id", this.joinType1 + "-" + this.joinType2 + "-ktable-ktable-join");
        this.streams = prepareTopology();
        this.streams.start();
        Assert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, this.expectedResult.size()), CoreMatchers.equalTo(this.expectedResult));
    }
}
