package org.apache.flink.statefun.flink.io.kinesis.polyglot;

import com.google.protobuf.Any;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.io.kinesis.KinesisSinkProvider;
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsRegion;
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSpec;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisSinkProvider.class */
public final class GenericKinesisSinkProvider implements SinkProvider {
    private final KinesisSinkProvider delegateProvider = new KinesisSinkProvider();

    @Override // org.apache.flink.statefun.flink.io.spi.SinkProvider
    public <T> SinkFunction<T> forSpec(EgressSpec<T> egressSpec) {
        return this.delegateProvider.forSpec(asKinesisEgressSpec(egressSpec));
    }

    private static <T> KinesisEgressSpec<T> asKinesisEgressSpec(EgressSpec<T> egressSpec) {
        if (!(egressSpec instanceof JsonEgressSpec)) {
            throw new IllegalArgumentException("Wrong type " + egressSpec.type());
        }
        JsonEgressSpec jsonEgressSpec = (JsonEgressSpec) egressSpec;
        EgressIdentifier<T> id = jsonEgressSpec.id();
        validateConsumedType(id);
        JsonNode specJson = jsonEgressSpec.specJson();
        KinesisEgressBuilder forIdentifier = KinesisEgressBuilder.forIdentifier(id);
        Optional<AwsRegion> optionalAwsRegion = AwsAuthSpecJsonParser.optionalAwsRegion(specJson);
        forIdentifier.getClass();
        optionalAwsRegion.ifPresent(forIdentifier::withAwsRegion);
        Optional<AwsCredentials> optionalAwsCredentials = AwsAuthSpecJsonParser.optionalAwsCredentials(specJson);
        forIdentifier.getClass();
        optionalAwsCredentials.ifPresent(forIdentifier::withAwsCredentials);
        OptionalInt optionalMaxOutstandingRecords = KinesisEgressSpecJsonParser.optionalMaxOutstandingRecords(specJson);
        forIdentifier.getClass();
        optionalMaxOutstandingRecords.ifPresent(forIdentifier::withMaxOutstandingRecords);
        KinesisEgressSpecJsonParser.clientConfigProperties(specJson).entrySet().forEach(entry -> {
            forIdentifier.withClientConfigurationProperty((String) entry.getKey(), (String) entry.getValue());
        });
        forIdentifier.withSerializer(serializerClass());
        return forIdentifier.build();
    }

    private static void validateConsumedType(EgressIdentifier<?> egressIdentifier) {
        Class<?> consumedType = egressIdentifier.consumedType();
        if (Any.class != consumedType) {
            throw new IllegalArgumentException("Generic Kinesis egress is only able to consume messages types of " + Any.class.getName() + " but " + consumedType.getName() + " is provided.");
        }
    }

    private static <T> Class<T> serializerClass() {
        return GenericKinesisEgressSerializer.class;
    }
}
