package org.apache.flink.statefun.flink.io.kafka;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSpec;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/GenericKafkaSinkProvider.class */
final class GenericKafkaSinkProvider implements SinkProvider {
    private final KafkaSinkProvider delegateProvider = new KafkaSinkProvider();

    @Override // org.apache.flink.statefun.flink.io.spi.SinkProvider
    public <T> SinkFunction<T> forSpec(EgressSpec<T> egressSpec) {
        return this.delegateProvider.forSpec(asKafkaEgressSpec(egressSpec));
    }

    private static <T> KafkaEgressSpec<T> asKafkaEgressSpec(EgressSpec<T> egressSpec) {
        if (!(egressSpec instanceof JsonEgressSpec)) {
            throw new IllegalArgumentException("Wrong type " + egressSpec.type());
        }
        JsonEgressSpec jsonEgressSpec = (JsonEgressSpec) egressSpec;
        EgressIdentifier<T> id = jsonEgressSpec.id();
        validateConsumedType(id);
        JsonNode json = jsonEgressSpec.json();
        KafkaEgressBuilder forIdentifier = KafkaEgressBuilder.forIdentifier(id);
        forIdentifier.withKafkaAddress(KafkaEgressSpecJsonParser.kafkaAddress(json)).withProperties(KafkaEgressSpecJsonParser.kafkaClientProperties(json)).withSerializer(serializerClass());
        KafkaEgressSpecJsonParser.optionalDeliverySemantic(json).ifPresent(kafkaProducerSemantic -> {
            switch (kafkaProducerSemantic) {
                case AT_LEAST_ONCE:
                    forIdentifier.withAtLeastOnceProducerSemantics();
                    return;
                case EXACTLY_ONCE:
                    forIdentifier.withExactlyOnceProducerSemantics(KafkaEgressSpecJsonParser.exactlyOnceDeliveryTxnTimeout(json));
                    return;
                case NONE:
                    forIdentifier.withNoProducerSemantics();
                    return;
                default:
                    throw new IllegalStateException("Unrecognized producer semantic: " + kafkaProducerSemantic);
            }
        });
        return forIdentifier.build();
    }

    private static void validateConsumedType(EgressIdentifier<?> egressIdentifier) {
        Class<?> consumedType = egressIdentifier.consumedType();
        if (TypedValue.class != consumedType) {
            throw new IllegalArgumentException("Generic Kafka egress is only able to consume messages types of " + TypedValue.class.getName() + " but " + consumedType.getName() + " is provided.");
        }
    }

    private static <T> Class<T> serializerClass() {
        return GenericKafkaEgressSerializer.class;
    }
}
