package org.apache.flink.statefun.flink.io.kafka;

import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressAutoResetPosition;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilderApiExtension;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProvider.class */
final class RoutableProtobufKafkaSourceProvider implements SourceProvider {
    private final KafkaSourceProvider delegateProvider = new KafkaSourceProvider();

    @Override // org.apache.flink.statefun.flink.io.spi.SourceProvider
    public <T> SourceFunction<T> forSpec(IngressSpec<T> ingressSpec) {
        return this.delegateProvider.forSpec(asKafkaIngressSpec(ingressSpec));
    }

    private static <T> KafkaIngressSpec<T> asKafkaIngressSpec(IngressSpec<T> ingressSpec) {
        if (!(ingressSpec instanceof JsonIngressSpec)) {
            throw new IllegalArgumentException("Wrong type " + ingressSpec.type());
        }
        JsonIngressSpec jsonIngressSpec = (JsonIngressSpec) ingressSpec;
        IngressIdentifier<T> id = jsonIngressSpec.id();
        Class<T> producedType = jsonIngressSpec.id().producedType();
        if (!Message.class.isAssignableFrom(producedType)) {
            throw new IllegalArgumentException("ProtocolBuffer based ingress is only able to produce types that derive from " + Message.class.getName() + " but " + producedType.getName() + " is provided.");
        }
        JsonNode json = jsonIngressSpec.json();
        Map<String, RoutingConfig> routableTopics = KafkaIngressSpecJsonParser.routableTopics(json);
        KafkaIngressBuilder forIdentifier = KafkaIngressBuilder.forIdentifier(id);
        forIdentifier.withKafkaAddress(KafkaIngressSpecJsonParser.kafkaAddress(json)).withProperties(KafkaIngressSpecJsonParser.kafkaClientProperties(json)).addTopics(new ArrayList(routableTopics.keySet()));
        Optional<String> optionalConsumerGroupId = KafkaIngressSpecJsonParser.optionalConsumerGroupId(json);
        forIdentifier.getClass();
        optionalConsumerGroupId.ifPresent(forIdentifier::withConsumerGroupId);
        Optional<KafkaIngressAutoResetPosition> optionalAutoOffsetResetPosition = KafkaIngressSpecJsonParser.optionalAutoOffsetResetPosition(json);
        forIdentifier.getClass();
        optionalAutoOffsetResetPosition.ifPresent(forIdentifier::withAutoResetPosition);
        Optional<KafkaIngressStartupPosition> optionalStartupPosition = KafkaIngressSpecJsonParser.optionalStartupPosition(json);
        forIdentifier.getClass();
        optionalStartupPosition.ifPresent(forIdentifier::withStartupPosition);
        KafkaIngressBuilderApiExtension.withDeserializer(forIdentifier, deserializer(routableTopics));
        return forIdentifier.build();
    }

    private static <T> KafkaIngressDeserializer<T> deserializer(Map<String, RoutingConfig> map) {
        return new RoutableProtobufKafkaIngressDeserializer(map);
    }
}
