package org.apache.paimon.flink.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.class */
public class KafkaLogStoreRegisterITCase extends KafkaTableTestBase {
    private static final String DATABASE = "mock_db";
    private static final String TABLE = "mock_table";

    @AfterEach
    public void tearDown() {
        try {
            AdminClient createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                createAdminClient.deleteTopics((Set) createAdminClient.listTopics().names().get()).all().get();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
        }
    }

    @Test
    public void testRegisterTopic() {
        Map registerTopic = createKafkaLogStoreRegister(getBootstrapServers(), "register-topic", 2).registerTopic();
        checkTopicExists("register-topic", 2, 1);
        Assertions.assertThat((String) registerTopic.get(KafkaLogOptions.TOPIC.key())).isEqualTo("register-topic");
    }

    @Test
    public void testRegisterTopicAuto() {
        Map registerTopic = createKafkaLogStoreRegister(getBootstrapServers()).registerTopic();
        try {
            AdminClient createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    Set set = (Set) createAdminClient.listTopics().names().get(5L, TimeUnit.SECONDS);
                    Assertions.assertThat(set.size()).isEqualTo(1);
                    String str = (String) set.stream().findFirst().get();
                    Assertions.assertThat((String) registerTopic.get(KafkaLogOptions.TOPIC.key())).isEqualTo(str);
                    String format = String.format("%s_%s_", DATABASE, TABLE);
                    Assertions.assertThat(str).startsWith(format);
                    Assertions.assertThat(str.substring(format.length())).matches("[0-9a-fA-F]{32}");
                    Assertions.assertThat((String) registerTopic.get(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS.key())).isEqualTo("1");
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Exception e) {
            Fail.fail(e.getMessage());
        }
    }

    @Test
    public void testRegisterTopicException() {
        KafkaLogStoreRegister createKafkaLogStoreRegister = createKafkaLogStoreRegister("invalid-bootstrap-servers:9092", "register-topic");
        createKafkaLogStoreRegister.getClass();
        Assertions.assertThatThrownBy(createKafkaLogStoreRegister::registerTopic).isInstanceOf(IllegalStateException.class).hasMessageContaining("Register topic for table mock_db.mock_table failed").hasRootCauseInstanceOf(ConfigException.class);
    }

    @Test
    public void testRegisterTopicExist() {
        String str = "topic-exist";
        createTopic("topic-exist", 1, 1);
        Assertions.assertThatThrownBy(() -> {
            createKafkaLogStoreRegister(getBootstrapServers(), str).registerTopic();
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Register topic for table mock_db.mock_table failed").hasRootCauseInstanceOf(TopicExistsException.class);
    }

    @Test
    public void testUnregisterTopic() {
        createTopic("unregister-topic", 2, 1);
        createKafkaLogStoreRegister(getBootstrapServers(), "unregister-topic", 2).unRegisterTopic();
        checkTopicNotExist("unregister-topic");
    }

    @Test
    public void testUnregisterTopicException() {
        String str = "not_exist_topic";
        Assertions.assertThatCode(() -> {
            createKafkaLogStoreRegister(getBootstrapServers(), str).unRegisterTopic();
        }).doesNotThrowAnyException();
    }

    private KafkaLogStoreRegister createKafkaLogStoreRegister(String str) {
        return createKafkaLogStoreRegister(str, null, null);
    }

    private KafkaLogStoreRegister createKafkaLogStoreRegister(String str, String str2) {
        return createKafkaLogStoreRegister(str, str2, null);
    }

    private KafkaLogStoreRegister createKafkaLogStoreRegister(String str, String str2, Integer num) {
        final Options options = new Options();
        options.set(KafkaLogOptions.BOOTSTRAP_SERVERS, str);
        if (str2 != null) {
            options.set(KafkaLogOptions.TOPIC, str2);
        }
        if (num != null) {
            options.set(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS, num);
        }
        options.set(FlinkCatalogOptions.REGISTER_TIMEOUT.key(), Duration.ofSeconds(20L).toString());
        return new KafkaLogStoreRegister(new LogStoreTableFactory.RegisterContext() { // from class: org.apache.paimon.flink.kafka.KafkaLogStoreRegisterITCase.1
            public Options getOptions() {
                return options;
            }

            public Identifier getIdentifier() {
                return Identifier.create(KafkaLogStoreRegisterITCase.DATABASE, KafkaLogStoreRegisterITCase.TABLE);
            }
        });
    }

    private void createTopic(String str, int i, int i2) {
        try {
            AdminClient createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    createAdminClient.createTopics(Collections.singletonList(new NewTopic(str, i, (short) i2)));
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            Fail.fail(e.getMessage());
        }
    }
}
