package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class */
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
    private static final String TABLE_1 = "table1";
    private static final String TABLE_2 = "table2";
    private static final String TABLE_3 = "table3";
    private static final String OUTPUT = "output-";
    private final Properties streamsConfig = getStreamsConfig();
    private final Properties streamsConfigTwo = getStreamsConfig();
    private final Properties streamsConfigThree = getStreamsConfig();
    private KafkaStreams streams;
    private KafkaStreams streamsTwo;
    private KafkaStreams streamsThree;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final MockTime MOCK_TIME = CLUSTER.time;
    private static final Properties CONSUMER_CONFIG = new Properties();
    private static final Properties PRODUCER_CONFIG_1 = new Properties();
    private static final Properties PRODUCER_CONFIG_2 = new Properties();
    private static final Properties PRODUCER_CONFIG_3 = new Properties();

    /* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest$JoinType.class */
    private enum JoinType {
        INNER
    }

    @BeforeClass
    public static void beforeTest() throws Exception {
        CLUSTER.createTopic(TABLE_1, 3, NUM_BROKERS);
        CLUSTER.createTopic(TABLE_2, 5, NUM_BROKERS);
        CLUSTER.createTopic(TABLE_3, 7, NUM_BROKERS);
        CLUSTER.createTopic(OUTPUT, 11, NUM_BROKERS);
        PRODUCER_CONFIG_1.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_1.put("acks", "all");
        PRODUCER_CONFIG_1.put("retries", 0);
        PRODUCER_CONFIG_1.put("key.serializer", IntegerSerializer.class);
        PRODUCER_CONFIG_1.put("value.serializer", FloatSerializer.class);
        PRODUCER_CONFIG_2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_2.put("acks", "all");
        PRODUCER_CONFIG_2.put("retries", 0);
        PRODUCER_CONFIG_2.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG_2.put("value.serializer", LongSerializer.class);
        PRODUCER_CONFIG_3.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_3.put("acks", "all");
        PRODUCER_CONFIG_3.put("retries", 0);
        PRODUCER_CONFIG_3.put("key.serializer", IntegerSerializer.class);
        PRODUCER_CONFIG_3.put("value.serializer", StringSerializer.class);
        List asList = Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), Float.valueOf(1.33f)), new KeyValue(2, Float.valueOf(2.22f)), new KeyValue(3, Float.valueOf(-1.22f)), new KeyValue(4, Float.valueOf(-2.22f)));
        List asList2 = Arrays.asList(new KeyValue("0", 0L), new KeyValue("1", 10L), new KeyValue("2", 20L), new KeyValue("3", 30L), new KeyValue("4", 40L), new KeyValue("5", 50L), new KeyValue("6", 60L), new KeyValue("7", 70L), new KeyValue("8", 80L), new KeyValue("9", 90L));
        List asList3 = Arrays.asList(new KeyValue(10, "waffle"));
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, asList, PRODUCER_CONFIG_1, MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, asList2, PRODUCER_CONFIG_2, MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, asList3, PRODUCER_CONFIG_3, MOCK_TIME);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-consumer");
        CONSUMER_CONFIG.put("key.deserializer", IntegerDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @Before
    public void before() throws IOException {
        String path = TestUtils.tempDirectory().getPath();
        this.streamsConfig.put("state.dir", path + "-1");
        this.streamsConfigTwo.put("state.dir", path + "-2");
        this.streamsConfigThree.put("state.dir", path + "-3");
        IntegrationTestUtils.purgeLocalStreamsState(Arrays.asList(this.streamsConfig, this.streamsConfigTwo, this.streamsConfigThree));
    }

    @After
    public void after() throws IOException {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        if (this.streamsTwo != null) {
            this.streamsTwo.close();
            this.streamsTwo = null;
        }
        if (this.streamsThree != null) {
            this.streamsThree.close();
            this.streamsThree = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfig);
    }

    @Test
    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new KeyValue<>(Integer.valueOf(NUM_BROKERS), "value1=1.33,value2=10,value3=waffle"));
        verifyKTableKTableJoin(JoinType.INNER, hashSet, true);
    }

    private void verifyKTableKTableJoin(JoinType joinType, Set<KeyValue<Integer, String>> set, boolean z) throws Exception {
        String str = z ? joinType + "-store1" : null;
        String str2 = z ? joinType + "-store2" : null;
        this.streams = prepareTopology(str, str2, this.streamsConfig);
        this.streamsTwo = prepareTopology(str, str2, this.streamsConfigTwo);
        this.streamsThree = prepareTopology(str, str2, this.streamsConfigThree);
        this.streams.start();
        this.streamsTwo.start();
        this.streamsThree.start();
        Assert.assertEquals(set, new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, set.size())));
    }

    private static Properties getStreamsConfig() {
        Properties properties = new Properties();
        properties.put("application.id", "KTable-FKJ-Multi");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("auto.offset.reset", "earliest");
        properties.put("cache.max.bytes.buffering", 0);
        properties.put("commit.interval.ms", 100);
        return properties;
    }

    private static KafkaStreams prepareTopology(String str, String str2, Properties properties) {
        UniqueTopicSerdeScope uniqueTopicSerdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table(TABLE_1, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.Float(), properties, false)));
        KTable table2 = streamsBuilder.table(TABLE_2, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.Long(), properties, false)));
        KTable table3 = streamsBuilder.table(TABLE_3, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)));
        if (str == null) {
            throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
        }
        Materialized withCachingDisabled = Materialized.as(str).withKeySerde(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), properties, true)).withValueSerde(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)).withCachingDisabled();
        if (str2 == null) {
            throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
        }
        Materialized withCachingDisabled2 = Materialized.as(str2).withKeySerde(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), properties, true)).withValueSerde(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)).withCachingDisabled();
        Function function = f -> {
            return Integer.toString((int) f.floatValue());
        };
        Function function2 = str3 -> {
            return str3.contains("value2=10") ? 10 : 0;
        };
        ValueJoiner valueJoiner = (f2, l) -> {
            return "value1=" + f2 + ",value2=" + l;
        };
        table.join(table2, function, valueJoiner, withCachingDisabled).join(table3, function2, (str4, str5) -> {
            return str4 + ",value3=" + str5;
        }, withCachingDisabled2).toStream().to(OUTPUT, Produced.with(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)));
        return new KafkaStreams(streamsBuilder.build(properties), properties);
    }
}
