package org.apache.flink.statefun.flink.io.kinesis.polyglot;

import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
import org.apache.flink.statefun.flink.io.kinesis.KinesisSourceProvider;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilderApiExtension;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kinesis/polyglot/RoutableProtobufKinesisSourceProvider.class */
public final class RoutableProtobufKinesisSourceProvider implements SourceProvider {
    private final KinesisSourceProvider delegateProvider = new KinesisSourceProvider();

    @Override // org.apache.flink.statefun.flink.io.spi.SourceProvider
    public <T> SourceFunction<T> forSpec(IngressSpec<T> ingressSpec) {
        return this.delegateProvider.forSpec(asKinesisIngressSpec(ingressSpec));
    }

    private static <T> KinesisIngressSpec<T> asKinesisIngressSpec(IngressSpec<T> ingressSpec) {
        if (!(ingressSpec instanceof JsonIngressSpec)) {
            throw new IllegalArgumentException("Wrong type " + ingressSpec.type());
        }
        JsonIngressSpec jsonIngressSpec = (JsonIngressSpec) ingressSpec;
        IngressIdentifier<T> id = jsonIngressSpec.id();
        Class<T> producedType = jsonIngressSpec.id().producedType();
        if (!Message.class.isAssignableFrom(producedType)) {
            throw new IllegalArgumentException("ProtocolBuffer based Kinesis ingress is only able to produce types that derive from " + Message.class.getName() + " but " + producedType.getName() + " is provided.");
        }
        JsonNode specJson = jsonIngressSpec.specJson();
        KinesisIngressBuilder forIdentifier = KinesisIngressBuilder.forIdentifier(id);
        Optional<AwsRegion> optionalAwsRegion = AwsAuthSpecJsonParser.optionalAwsRegion(specJson);
        forIdentifier.getClass();
        optionalAwsRegion.ifPresent(forIdentifier::withAwsRegion);
        Optional<AwsCredentials> optionalAwsCredentials = AwsAuthSpecJsonParser.optionalAwsCredentials(specJson);
        forIdentifier.getClass();
        optionalAwsCredentials.ifPresent(forIdentifier::withAwsCredentials);
        Optional<KinesisIngressStartupPosition> optionalStartupPosition = KinesisIngressSpecJsonParser.optionalStartupPosition(specJson);
        forIdentifier.getClass();
        optionalStartupPosition.ifPresent(forIdentifier::withStartupPosition);
        KinesisIngressSpecJsonParser.clientConfigProperties(specJson).entrySet().forEach(entry -> {
            forIdentifier.withClientConfigurationProperty((String) entry.getKey(), (String) entry.getValue());
        });
        Map<String, RoutingConfig> routableStreams = KinesisIngressSpecJsonParser.routableStreams(specJson);
        KinesisIngressBuilderApiExtension.withDeserializer(forIdentifier, deserializer(routableStreams));
        forIdentifier.withStreams(new ArrayList(routableStreams.keySet()));
        return forIdentifier.build();
    }

    private static <T> KinesisIngressDeserializer<T> deserializer(Map<String, RoutingConfig> map) {
        return new RoutableProtobufKinesisIngressDeserializer(map);
    }
}
