/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class StreamTableJoinTopologyOptimizationIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private String tableTopic;
    private String inputTopic;
    private String outputTopic;
    private String applicationId;
    private KafkaStreams kafkaStreams;
    private Properties streamsConfiguration;
    @Rule
    public TestName testName = new TestName();
    @Parameterized.Parameter
    public String topologyOptimization;

    @Parameterized.Parameters(name="Optimization = {0}")
    public static Collection<?> topologyOptimization() {
        return Arrays.asList({"all"}, {"none"});
    }

    @Before
    public void before() throws InterruptedException {
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.tableTopic = "table-topic" + safeTestName;
        this.inputTopic = "stream-topic-" + safeTestName;
        this.outputTopic = "output-topic-" + safeTestName;
        this.applicationId = "app-" + safeTestName;
        CLUSTER.createTopic(this.inputTopic, 4, 1);
        CLUSTER.createTopic(this.tableTopic, 2, 1);
        CLUSTER.createTopic(this.outputTopic, 4, 1);
        this.streamsConfiguration.put("application.id", this.applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("topology.optimization", this.topologyOptimization);
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldDoStreamTableJoinWithDifferentNumberOfPartitions() throws Exception {
        String storeName = "store";
        String selectKeyName = "selectKey";
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(this.inputTopic);
        KTable table = streamsBuilder.table(this.tableTopic, Materialized.as((String)"store"));
        stream.selectKey((key, value) -> key, Named.as((String)"selectKey")).join(table, (value1, value2) -> value2).to(this.outputTopic);
        this.kafkaStreams = this.startStreams(streamsBuilder);
        long timestamp = System.currentTimeMillis();
        List expectedRecords = Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"));
        this.sendEvents(this.inputTopic, timestamp, expectedRecords);
        this.sendEvents(this.outputTopic, timestamp, expectedRecords);
        this.validateReceivedMessages(this.outputTopic, (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), expectedRecords);
        Set<String> allTopicsInCluster = CLUSTER.getAllTopicsInCluster();
        String repartitionTopicName = this.applicationId + "-" + "selectKey" + "-repartition";
        String tableChangelogStoreName = this.applicationId + "-" + "store" + "-changelog";
        Assert.assertTrue((boolean)this.topicExists(repartitionTopicName));
        Assert.assertEquals((long)2L, (long)this.getNumberOfPartitionsForTopic(repartitionTopicName));
        if ("all".equals(this.topologyOptimization)) {
            Assert.assertFalse((boolean)allTopicsInCluster.contains(tableChangelogStoreName));
        } else if ("none".equals(this.topologyOptimization)) {
            Assert.assertTrue((boolean)allTopicsInCluster.contains(tableChangelogStoreName));
        }
    }

    private KafkaStreams startStreams(StreamsBuilder builder) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(this.streamsConfiguration), this.streamsConfiguration);
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (KafkaStreams.State.REBALANCING == oldState && KafkaStreams.State.RUNNING == newState) {
                latch.countDown();
            }
        });
        kafkaStreams.start();
        latch.await(60000L, TimeUnit.MILLISECONDS);
        return kafkaStreams;
    }

    private int getNumberOfPartitionsForTopic(String topic) throws Exception {
        try (AdminClient adminClient = StreamTableJoinTopologyOptimizationIntegrationTest.createAdminClient();){
            TopicDescription topicDescription = (TopicDescription)((KafkaFuture)adminClient.describeTopics(Collections.singleton(topic)).values().get(topic)).get(60000L, TimeUnit.MILLISECONDS);
            int n = topicDescription.partitions().size();
            return n;
        }
    }

    private boolean topicExists(String topic) {
        return CLUSTER.getAllTopicsInCluster().contains(topic);
    }

    private <K, V> void sendEvents(String topic, long timestamp, List<KeyValue<K, V>> events) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, events, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private <K, V> void validateReceivedMessages(String topic, Deserializer<K> keySerializer, Deserializer<V> valueSerializer, List<KeyValue<K, V>> expectedRecords) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keySerializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueSerializer.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerProperties, topic, expectedRecords);
    }

    private static AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        return AdminClient.create((Properties)properties);
    }
}

