package org.apache.flink.statefun.flink.io.kinesis;

import java.time.format.DateTimeFormatter;
import java.util.Properties;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.class */
public final class KinesisSourceProvider implements SourceProvider {
    @Override // org.apache.flink.statefun.flink.io.spi.SourceProvider
    public <T> SourceFunction<T> forSpec(IngressSpec<T> ingressSpec) {
        KinesisIngressSpec asKinesisSpec = asKinesisSpec(ingressSpec);
        return new FlinkKinesisConsumer(asKinesisSpec.streams(), deserializationSchemaFromSpec(asKinesisSpec), propertiesFromSpec(asKinesisSpec));
    }

    private static <T> KinesisIngressSpec<T> asKinesisSpec(IngressSpec<T> ingressSpec) {
        if (ingressSpec instanceof KinesisIngressSpec) {
            return (KinesisIngressSpec) ingressSpec;
        }
        if (ingressSpec == null) {
            throw new NullPointerException("Unable to translate a NULL spec");
        }
        throw new IllegalArgumentException(String.format("Wrong type %s", ingressSpec.type()));
    }

    private static <T> KinesisDeserializationSchema<T> deserializationSchemaFromSpec(KinesisIngressSpec<T> kinesisIngressSpec) {
        return new KinesisDeserializationSchemaDelegate(kinesisIngressSpec.deserializer());
    }

    private static Properties propertiesFromSpec(KinesisIngressSpec<?> kinesisIngressSpec) {
        Properties properties = new Properties();
        properties.putAll(resolveClientProperties(kinesisIngressSpec.clientConfigurationProperties()));
        properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(kinesisIngressSpec.awsRegion()));
        properties.putAll(AwsAuthConfigProperties.forAwsCredentials(kinesisIngressSpec.awsCredentials()));
        setStartupPositionProperties(properties, kinesisIngressSpec.startupPosition());
        return properties;
    }

    private static Properties resolveClientProperties(Properties properties) {
        Properties properties2 = new Properties();
        for (String str : properties.stringPropertyNames()) {
            properties2.setProperty(asFlinkConsumerClientPropertyKey(str), properties.getProperty(str));
        }
        return properties2;
    }

    private static void setStartupPositionProperties(Properties properties, KinesisIngressStartupPosition kinesisIngressStartupPosition) {
        if (kinesisIngressStartupPosition.isEarliest()) {
            properties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
            return;
        }
        if (kinesisIngressStartupPosition.isLatest()) {
            properties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());
        } else {
            if (!kinesisIngressStartupPosition.isDate()) {
                throw new IllegalStateException("Unrecognized ingress startup position type: " + kinesisIngressStartupPosition);
            }
            properties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.AT_TIMESTAMP.name());
            properties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, kinesisIngressStartupPosition.asDate().date().format(DateTimeFormatter.ofPattern(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT)));
        }
    }

    private static String asFlinkConsumerClientPropertyKey(String str) {
        return AWSUtil.AWS_CLIENT_CONFIG_PREFIX + lowercaseFirstLetter(str);
    }

    private static String lowercaseFirstLetter(String str) {
        char[] charArray = str.toCharArray();
        charArray[0] = Character.toLowerCase(charArray[0]);
        return new String(charArray);
    }
}
