package org.apache.flink.statefun.flink.io.kinesis;

import java.util.Properties;
import org.apache.flink.statefun.flink.io.common.ReflectionUtil;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kinesis/KinesisSinkProvider.class */
public final class KinesisSinkProvider implements SinkProvider {
    @Override // org.apache.flink.statefun.flink.io.spi.SinkProvider
    public <T> SinkFunction<T> forSpec(EgressSpec<T> egressSpec) {
        KinesisEgressSpec asKinesisSpec = asKinesisSpec(egressSpec);
        CachingPartitionerSerializerDelegate cachingPartitionerSerializerDelegate = new CachingPartitionerSerializerDelegate(serializerInstanceFromSpec(asKinesisSpec));
        FlinkKinesisProducer flinkKinesisProducer = new FlinkKinesisProducer(cachingPartitionerSerializerDelegate, propertiesFromSpec(asKinesisSpec));
        flinkKinesisProducer.setCustomPartitioner(cachingPartitionerSerializerDelegate);
        flinkKinesisProducer.setQueueLimit(asKinesisSpec.maxOutstandingRecords());
        flinkKinesisProducer.setFailOnError(true);
        return flinkKinesisProducer;
    }

    private static Properties propertiesFromSpec(KinesisEgressSpec<?> kinesisEgressSpec) {
        Properties properties = new Properties();
        properties.putAll(kinesisEgressSpec.clientConfigurationProperties());
        properties.putAll(AwsAuthConfigProperties.forAwsRegionProducerProps(kinesisEgressSpec.awsRegion()));
        properties.putAll(AwsAuthConfigProperties.forAwsCredentials(kinesisEgressSpec.awsCredentials()));
        return properties;
    }

    private static <T> KinesisEgressSpec<T> asKinesisSpec(EgressSpec<T> egressSpec) {
        if (egressSpec instanceof KinesisEgressSpec) {
            return (KinesisEgressSpec) egressSpec;
        }
        if (egressSpec == null) {
            throw new NullPointerException("Unable to translate a NULL spec");
        }
        throw new IllegalArgumentException(String.format("Wrong type %s", egressSpec.type()));
    }

    private static <T> KinesisEgressSerializer<T> serializerInstanceFromSpec(KinesisEgressSpec<T> kinesisEgressSpec) {
        return (KinesisEgressSerializer) ReflectionUtil.instantiate(kinesisEgressSpec.serializerClass());
    }
}
