package org.apache.paimon.flink.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogStoreRegister.class */
public class KafkaLogStoreRegister implements LogStoreRegister {
    private final String bootstrapServers;
    private final String topic;
    private final int partition;
    private final int replicationFactor;
    private final Duration timeout;
    private final Properties properties;
    private final Identifier identifier;

    public KafkaLogStoreRegister(LogStoreTableFactory.RegisterContext registerContext) {
        this.bootstrapServers = (String) registerContext.getOptions().get(KafkaLogOptions.BOOTSTRAP_SERVERS);
        this.identifier = registerContext.getIdentifier();
        this.topic = registerContext.getOptions().getOptional(KafkaLogOptions.TOPIC).isPresent() ? (String) registerContext.getOptions().get(KafkaLogOptions.TOPIC) : String.format("%s_%s_%s", this.identifier.getDatabaseName(), this.identifier.getObjectName(), UUID.randomUUID().toString().replace("-", ""));
        Preconditions.checkNotNull(registerContext.getOptions().get(KafkaLogOptions.BOOTSTRAP_SERVERS));
        Preconditions.checkNotNull(this.topic);
        Preconditions.checkNotNull(this.identifier);
        if (registerContext.getOptions().get(FlinkCatalogOptions.REGISTER_TIMEOUT.key()) == null) {
            this.timeout = FlinkCatalogOptions.REGISTER_TIMEOUT.defaultValue();
        } else {
            this.timeout = Duration.parse(registerContext.getOptions().get(FlinkCatalogOptions.REGISTER_TIMEOUT.key()));
        }
        this.partition = registerContext.getOptions().getOptional(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS).isPresent() ? ((Integer) registerContext.getOptions().get(FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS)).intValue() : ((Integer) registerContext.getOptions().get(CoreOptions.BUCKET)).intValue() == -1 ? 1 : ((Integer) registerContext.getOptions().get(CoreOptions.BUCKET)).intValue();
        this.replicationFactor = ((Integer) registerContext.getOptions().get(FlinkConnectorOptions.LOG_SYSTEM_REPLICATION)).intValue();
        this.properties = KafkaLogStoreFactory.toKafkaProperties(registerContext.getOptions());
    }

    @Override // org.apache.paimon.flink.log.LogStoreRegister
    public Map<String, String> registerTopic() {
        try {
            AdminClient create = AdminClient.create(this.properties);
            Throwable th = null;
            try {
                create.createTopics(Collections.singleton(new NewTopic(this.topic, this.partition, (short) this.replicationFactor))).all().get(this.timeout.getSeconds(), TimeUnit.SECONDS);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return ImmutableMap.of(KafkaLogOptions.TOPIC.key(), this.topic, FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS.key(), String.valueOf(this.partition), FlinkConnectorOptions.LOG_SYSTEM_REPLICATION.key(), String.valueOf(this.replicationFactor));
            } finally {
            }
        } catch (TimeoutException e) {
            throw new IllegalStateException(String.format("Register topic for table %s timeout with properties %s", this.identifier.getFullName(), this.properties), e);
        } catch (Exception e2) {
            throw new IllegalStateException(String.format("Register topic for table %s failed with properties %s", this.identifier.getFullName(), this.properties), e2);
        }
    }

    @Override // org.apache.paimon.flink.log.LogStoreRegister
    public void unRegisterTopic() {
        try {
            AdminClient create = AdminClient.create(this.properties);
            Throwable th = null;
            try {
                create.deleteTopics(Collections.singleton(this.topic)).all().get(this.timeout.getSeconds(), TimeUnit.SECONDS);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw new IllegalStateException(String.format("Unregister topic for table %s failed with properties %s", this.identifier.getFullName(), this.properties), e);
            }
        } catch (TimeoutException e2) {
            throw new RuntimeException(String.format("Unregister topic for table %s timeout with properties %s", this.identifier.getFullName(), this.properties), e2);
        } catch (Exception e3) {
            throw new RuntimeException(String.format("Unregister topic for table %s failed with properties %s", this.identifier.getFullName(), this.properties), e3);
        }
    }
}
