package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig;
import kafka.utils.MockTime;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/InternalTopicIntegrationTest.class */
public class InternalTopicIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String APP_ID = "internal-topics-integration-test";
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
    private Properties streamsProp;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final MockTime mockTime = CLUSTER.time;

    @BeforeClass
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() {
        this.streamsProp = new Properties();
        this.streamsProp.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsProp.put("default.key.serde", Serdes.String().getClass().getName());
        this.streamsProp.put("default.value.serde", Serdes.String().getClass().getName());
        this.streamsProp.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsProp.put("commit.interval.ms", 100L);
        this.streamsProp.put("cache.max.bytes.buffering", 0);
        this.streamsProp.put("auto.offset.reset", "earliest");
    }

    @After
    public void after() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsProp);
    }

    private void produceData(List<String> list) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, list, properties, this.mockTime);
    }

    private Properties getTopicProperties(String str) {
        Admin createAdminClient = createAdminClient();
        Throwable th = null;
        try {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
            try {
                Config config = (Config) ((KafkaFuture) createAdminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource)).get();
                Properties properties = new Properties();
                for (ConfigEntry configEntry : config.entries()) {
                    if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) {
                        properties.put(configEntry.name(), configEntry.value());
                    }
                }
                return properties;
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        } finally {
            if (createAdminClient != null) {
                if (0 != 0) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createAdminClient.close();
                }
            }
        }
    }

    private Admin createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        return Admin.create(properties);
    }

    @Test
    public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
        this.streamsProp.put("application.id", "internal-topics-integration-test-windowed-FKJ");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
        stream.groupBy((str, str2) -> {
            return str;
        }, Grouped.with("GroupName", Serdes.String(), Serdes.String())).windowedBy(TimeWindows.of(Duration.ofMinutes(10L))).aggregate(() -> {
            return "";
        }, (str3, str4, str5) -> {
            return str5 + str3;
        }).leftJoin(streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC), str6 -> {
            return str6;
        }, (str7, str8) -> {
            return str7 + str8;
        });
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(new KafkaStreams(streamsBuilder.build(), this.streamsProp)), Duration.ofSeconds(60L));
    }

    @Test
    public void shouldCompactTopicsForKeyValueStoreChangelogs() {
        this.streamsProp.put("application.id", "internal-topics-integration-test-compact");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(DEFAULT_INPUT_TOPIC).flatMapValues(str -> {
            return Arrays.asList(str.toLowerCase(Locale.getDefault()).split("\\W+"));
        }).groupBy(MockMapper.selectValueMapper()).count(Materialized.as("Counts"));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsProp);
        kafkaStreams.start();
        produceData(Arrays.asList("hello", "world", "world", "hello world"));
        IntegrationTestUtils.waitForCompletion(kafkaStreams, 2, 30000L);
        kafkaStreams.close();
        Assert.assertEquals(LogConfig.Compact(), getTopicProperties(ProcessorStateManager.storeChangelogTopic("internal-topics-integration-test-compact", "Counts", (String) null)).getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals(LogConfig.Delete(), getTopicProperties("internal-topics-integration-test-compact-Counts-repartition").getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals(4L, r0.size());
    }

    @Test
    public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() {
        this.streamsProp.put("application.id", "internal-topics-integration-test-compact-delete");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(DEFAULT_INPUT_TOPIC).flatMapValues(str -> {
            return Arrays.asList(str.toLowerCase(Locale.getDefault()).split("\\W+"));
        }).groupBy(MockMapper.selectValueMapper()).windowedBy(TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofMillis(0L))).count(Materialized.as("CountWindows").withRetention(Duration.ofSeconds(2L)));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsProp);
        kafkaStreams.start();
        produceData(Arrays.asList("hello", "world", "world", "hello world"));
        IntegrationTestUtils.waitForCompletion(kafkaStreams, 2, 30000L);
        kafkaStreams.close();
        Properties topicProperties = getTopicProperties(ProcessorStateManager.storeChangelogTopic("internal-topics-integration-test-compact-delete", "CountWindows", (String) null));
        List asList = Arrays.asList(topicProperties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
        Assert.assertEquals(2L, asList.size());
        Assert.assertTrue(asList.contains(LogConfig.Compact()));
        Assert.assertTrue(asList.contains(LogConfig.Delete()));
        Assert.assertEquals(TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS) + 2000, Long.parseLong(topicProperties.getProperty(LogConfig.RetentionMsProp())));
        Assert.assertEquals(LogConfig.Delete(), getTopicProperties("internal-topics-integration-test-compact-delete-CountWindows-repartition").getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals(4L, r0.size());
    }
}
