package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.class */
public class StreamTableJoinTopologyOptimizationIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private String tableTopic;
    private String inputTopic;
    private String outputTopic;
    private String applicationId;
    private KafkaStreams kafkaStreams;
    private Properties streamsConfiguration;

    @Parameterized.Parameter
    public String topologyOptimization;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "Optimization = {0}")
    public static Collection<?> topologyOptimization() {
        return Arrays.asList(new String[]{"all"}, new String[]{"none"});
    }

    @Before
    public void before() throws InterruptedException {
        this.streamsConfiguration = new Properties();
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.tableTopic = "table-topic" + safeUniqueTestName;
        this.inputTopic = "stream-topic-" + safeUniqueTestName;
        this.outputTopic = "output-topic-" + safeUniqueTestName;
        this.applicationId = "app-" + safeUniqueTestName;
        CLUSTER.createTopic(this.inputTopic, 4, 1);
        CLUSTER.createTopic(this.tableTopic, 2, 1);
        CLUSTER.createTopic(this.outputTopic, 4, 1);
        this.streamsConfiguration.put("application.id", this.applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("statestore.cache.max.bytes", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100L);
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("topology.optimization", this.topologyOptimization);
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldDoStreamTableJoinWithDifferentNumberOfPartitions() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic).selectKey((num, str) -> {
            return num;
        }, Named.as("selectKey")).join(streamsBuilder.table(this.tableTopic, Materialized.as("store")), (str2, str3) -> {
            return str3;
        }).to(this.outputTopic);
        this.kafkaStreams = startStreams(streamsBuilder);
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(new KeyValue(1, "A"), new KeyValue(2, "B"));
        sendEvents(this.inputTopic, currentTimeMillis, asList);
        sendEvents(this.outputTopic, currentTimeMillis, asList);
        validateReceivedMessages(this.outputTopic, new IntegerDeserializer(), new StringDeserializer(), asList);
        Set<String> allTopicsInCluster = CLUSTER.getAllTopicsInCluster();
        String str4 = this.applicationId + "-selectKey-repartition";
        String str5 = this.applicationId + "-store-changelog";
        Assert.assertTrue(topicExists(str4));
        Assert.assertEquals(2L, getNumberOfPartitionsForTopic(str4));
        if ("all".equals(this.topologyOptimization)) {
            Assert.assertFalse(allTopicsInCluster.contains(str5));
        } else if ("none".equals(this.topologyOptimization)) {
            Assert.assertTrue(allTopicsInCluster.contains(str5));
        }
    }

    private KafkaStreams startStreams(StreamsBuilder streamsBuilder) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(this.streamsConfiguration), this.streamsConfiguration);
        kafkaStreams.setStateListener((state, state2) -> {
            if (KafkaStreams.State.REBALANCING == state2 && KafkaStreams.State.RUNNING == state) {
                countDownLatch.countDown();
            }
        });
        kafkaStreams.start();
        countDownLatch.await(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        return kafkaStreams;
    }

    private int getNumberOfPartitionsForTopic(String str) throws Exception {
        Admin createAdminClient = createAdminClient();
        Throwable th = null;
        try {
            try {
                int size = ((TopicDescription) ((KafkaFuture) createAdminClient.describeTopics(Collections.singleton(str)).topicNameValues().get(str)).get(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)).partitions().size();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                return size;
            } finally {
            }
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    private boolean topicExists(String str) {
        return CLUSTER.getAllTopicsInCluster().contains(str);
    }

    private <K, V> void sendEvents(String str, long j, List<KeyValue<K, V>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(str, list, TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
    }

    private <K, V> void validateReceivedMessages(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2, List<KeyValue<K, V>> list) throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(properties, str, list);
    }

    private static Admin createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        return Admin.create(properties);
    }
}
