package org.apache.paimon.flink.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TopicExistsException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.sink.LogSinkFunction;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogSinkProvider.class */
public class KafkaLogSinkProvider implements LogSinkProvider {
    private static final long serialVersionUID = 1;
    private final String topic;
    private final Properties properties;

    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogConsistency consistency;
    private final CoreOptions.LogChangelogMode changelogMode;
    private final Integer numBuckets;
    public static final int DEFAULT_REPLICATION_FACTOR = 2;

    public KafkaLogSinkProvider(String str, Properties properties, @Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<RowData> serializationSchema2, CoreOptions.LogConsistency logConsistency, CoreOptions.LogChangelogMode logChangelogMode, Integer num) {
        this.topic = str;
        this.properties = properties;
        this.primaryKeySerializer = serializationSchema;
        this.valueSerializer = serializationSchema2;
        this.consistency = logConsistency;
        this.changelogMode = logChangelogMode;
        this.numBuckets = num;
    }

    @Override // org.apache.paimon.flink.log.LogSinkProvider
    public LogSinkFunction createSink() {
        FlinkKafkaProducer.Semantic semantic;
        switch (this.consistency) {
            case TRANSACTIONAL:
                semantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
                break;
            case EVENTUAL:
                semantic = FlinkKafkaProducer.Semantic.AT_LEAST_ONCE;
                break;
            default:
                throw new IllegalArgumentException("Unsupported: " + this.consistency);
        }
        createTopicIfNotExists();
        return new KafkaSinkFunction(this.topic, createSerializationSchema(), this.properties, semantic);
    }

    @VisibleForTesting
    KafkaLogSerializationSchema createSerializationSchema() {
        return new KafkaLogSerializationSchema(this.topic, this.primaryKeySerializer, this.valueSerializer, this.changelogMode);
    }

    private void createTopicIfNotExists() {
        try {
            AdminClient create = AdminClient.create(this.properties);
            Throwable th = null;
            try {
                if (!((Set) create.listTopics().names().get()).contains(this.topic)) {
                    int size = ((Collection) create.describeCluster().nodes().get()).size();
                    create.createTopics(Collections.singleton(new NewTopic(this.topic, this.numBuckets.intValue(), (short) (2 > size ? size : 2)))).all().get();
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new TableException("Error in createTopicIfNotExists", e);
            }
            throw new TableException(String.format("Failed to create kafka topic. Reason: topic %s exists. ", this.topic));
        }
    }
}
