package org.apache.flink.statefun.flink.io.kafka;

import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.statefun.flink.io.common.ReflectionUtil;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSpec;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.class */
public class KafkaSinkProvider implements SinkProvider {
    @Override // org.apache.flink.statefun.flink.io.spi.SinkProvider
    public <T> SinkFunction<T> forSpec(EgressSpec<T> egressSpec) {
        KafkaEgressSpec<T> asSpec = asSpec(egressSpec);
        Properties properties = new Properties();
        properties.putAll(asSpec.properties());
        properties.setProperty("bootstrap.servers", asSpec.kafkaAddress());
        FlinkKafkaProducer.Semantic semanticFromSpec = semanticFromSpec(asSpec);
        if (semanticFromSpec == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
            properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(asSpec.transactionTimeoutDuration().toMillis()));
        }
        return new FlinkKafkaProducer(randomKafkaTopic(), serializerFromSpec(asSpec), properties, semanticFromSpec, asSpec.kafkaProducerPoolSize());
    }

    private <T> KafkaSerializationSchema<T> serializerFromSpec(KafkaEgressSpec<T> kafkaEgressSpec) {
        return new KafkaSerializationSchemaDelegate((KafkaEgressSerializer) ReflectionUtil.instantiate(kafkaEgressSpec.serializerClass()));
    }

    private static <T> FlinkKafkaProducer.Semantic semanticFromSpec(KafkaEgressSpec<T> kafkaEgressSpec) {
        switch (kafkaEgressSpec.semantic()) {
            case EXACTLY_ONCE:
                return FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
            case AT_LEAST_ONCE:
                return FlinkKafkaProducer.Semantic.AT_LEAST_ONCE;
            case NONE:
                return FlinkKafkaProducer.Semantic.NONE;
            default:
                throw new IllegalArgumentException("Unknown producer semantic " + kafkaEgressSpec.semantic());
        }
    }

    private static <T> KafkaEgressSpec<T> asSpec(EgressSpec<T> egressSpec) {
        if (egressSpec instanceof KafkaEgressSpec) {
            return (KafkaEgressSpec) egressSpec;
        }
        if (egressSpec == null) {
            throw new NullPointerException("Unable to translate a NULL spec");
        }
        throw new IllegalArgumentException(String.format("Wrong type %s", egressSpec.type()));
    }

    private static String randomKafkaTopic() {
        return "__stateful_functions_random_topic_" + StringUtils.generateRandomAlphanumericString(ThreadLocalRandom.current(), 16);
    }
}
