package org.apache.flink.statefun.flink.core.jsonmodule;

import com.google.protobuf.Any;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.kinesis.shaded.org.apache.http.HttpHost;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.ResourceLocator;
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
import org.apache.flink.statefun.flink.core.common.Maps;
import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
import org.apache.flink.statefun.flink.core.jsonmodule.Pointers;
import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
import org.apache.flink.statefun.sdk.EgressType;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.util.TimeUtils;
import org.apache.kafka.common.config.SslConfigs;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.class */
public final class JsonModule implements StatefulFunctionModule {
    private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
    private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
    private final JsonNode spec;
    private final URL moduleUrl;

    public JsonModule(JsonNode jsonNode, URL url) {
        this.spec = (JsonNode) Objects.requireNonNull(jsonNode);
        this.moduleUrl = (URL) Objects.requireNonNull(url);
    }

    @Override // org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
    public void configure(Map<String, String> map, StatefulFunctionModule.Binder binder) {
        try {
            configureFunctions(binder, Selectors.listAt(this.spec, Pointers.FUNCTIONS_POINTER));
            configureRouters(binder, Selectors.listAt(this.spec, Pointers.ROUTERS_POINTER));
            configureIngress(binder, Selectors.listAt(this.spec, Pointers.INGRESSES_POINTER));
            configureEgress(binder, Selectors.listAt(this.spec, Pointers.EGRESSES_POINTER));
        } catch (Throwable th) {
            throw new ModuleConfigurationException(String.format("Error while parsing module at %s", this.moduleUrl), th);
        }
    }

    private void configureFunctions(StatefulFunctionModule.Binder binder, Iterable<? extends JsonNode> iterable) {
        for (Map.Entry entry : ((Map) StreamSupport.stream(iterable.spliterator(), false).map(JsonModule::parseFunctionSpec).collect(Collectors.groupingBy((v0) -> {
            return v0.kind();
        }, groupByFunctionType()))).entrySet()) {
            StatefulFunctionProvider functionProvider = functionProvider((FunctionSpec.Kind) entry.getKey(), (Map) entry.getValue());
            Iterator it2 = ((Map) entry.getValue()).keySet().iterator();
            while (it2.hasNext()) {
                binder.bindFunctionProvider((FunctionType) it2.next(), functionProvider);
            }
        }
    }

    private static StatefulFunctionProvider functionProvider(FunctionSpec.Kind kind, Map<FunctionType, FunctionSpec> map) {
        switch (kind) {
            case HTTP:
                Class<HttpFunctionSpec> cls = HttpFunctionSpec.class;
                HttpFunctionSpec.class.getClass();
                return new HttpFunctionProvider(Maps.transformValues(map, (v1) -> {
                    return r3.cast(v1);
                }));
            case GRPC:
                Class<GrpcFunctionSpec> cls2 = GrpcFunctionSpec.class;
                GrpcFunctionSpec.class.getClass();
                return new GrpcFunctionProvider(Maps.transformValues(map, (v1) -> {
                    return r3.cast(v1);
                }));
            default:
                throw new IllegalStateException("Unexpected value: " + kind);
        }
    }

    private void configureRouters(StatefulFunctionModule.Binder binder, Iterable<? extends JsonNode> iterable) {
        for (JsonNode jsonNode : iterable) {
            requireProtobufRouterType(jsonNode);
            binder.bindIngressRouter(targetRouterIngress(jsonNode), dynamicRouter(jsonNode));
        }
    }

    private void configureIngress(StatefulFunctionModule.Binder binder, Iterable<? extends JsonNode> iterable) {
        for (JsonNode jsonNode : iterable) {
            IngressIdentifier<Message> ingressId = ingressId(jsonNode);
            IngressType ingressType = ingressType(jsonNode);
            binder.bindIngress(new JsonIngressSpec(ingressType, ingressId, jsonNode));
            if (isAutoRoutableIngress(ingressType)) {
                binder.bindIngressRouter(ingressId, new AutoRoutableProtobufRouter());
            }
        }
    }

    private void configureEgress(StatefulFunctionModule.Binder binder, Iterable<? extends JsonNode> iterable) {
        for (JsonNode jsonNode : iterable) {
            binder.bindEgress(new JsonEgressSpec(egressType(jsonNode), egressId(jsonNode), jsonNode));
        }
    }

    private static IngressType ingressType(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, Pointers.Ingress.META_TYPE));
        return new IngressType(from.namespace(), from.name());
    }

    private static IngressIdentifier<Message> ingressId(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, Pointers.Ingress.META_ID));
        return new IngressIdentifier<>(Message.class, from.namespace(), from.name());
    }

    private static boolean isAutoRoutableIngress(IngressType ingressType) {
        return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE) || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
    }

    private static EgressType egressType(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, Pointers.Egress.META_TYPE));
        return new EgressType(from.namespace(), from.name());
    }

    private static EgressIdentifier<Any> egressId(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, Pointers.Egress.META_ID));
        return new EgressIdentifier<>(from.namespace(), from.name(), Any.class);
    }

    private static Router<Message> dynamicRouter(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, Pointers.Routers.SPEC_TARGET);
        String textAt2 = Selectors.textAt(jsonNode, Pointers.Routers.SPEC_DESCRIPTOR);
        String textAt3 = Selectors.textAt(jsonNode, Pointers.Routers.SPEC_MESSAGE_TYPE);
        Optional<Descriptors.GenericDescriptor> descriptorByName = protobufDescriptorMap(textAt2).getDescriptorByName(textAt3);
        if (descriptorByName.isPresent()) {
            return ProtobufRouter.forAddressTemplate((Descriptors.Descriptor) descriptorByName.get(), textAt);
        }
        throw new IllegalStateException("Error while processing a router definition. Unable to locate a message " + textAt3 + " in a descriptor set " + textAt2);
    }

    private static ProtobufDescriptorMap protobufDescriptorMap(String str) {
        try {
            URL findNamedResource = ResourceLocator.findNamedResource(str);
            if (findNamedResource == null) {
                throw new IllegalArgumentException("Unable to locate a Protobuf descriptor set at " + str);
            }
            return ProtobufDescriptorMap.from(findNamedResource);
        } catch (IOException e) {
            throw new IllegalStateException("Error while processing a router definition. Unable to read the descriptor set at  " + str, e);
        }
    }

    private static IngressIdentifier<Message> targetRouterIngress(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, Pointers.Routers.SPEC_INGRESS));
        return new IngressIdentifier<>(Message.class, from.namespace(), from.name());
    }

    private static void requireProtobufRouterType(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, Pointers.Routers.META_TYPE);
        if (!textAt.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
            throw new IllegalStateException("Invalid router type " + textAt);
        }
    }

    private static FunctionSpec parseFunctionSpec(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, Pointers.Functions.META_KIND);
        FunctionSpec.Kind valueOf = FunctionSpec.Kind.valueOf(textAt.toUpperCase(Locale.getDefault()));
        FunctionType functionType = functionType(jsonNode);
        switch (valueOf) {
            case HTTP:
                return new HttpFunctionSpec(functionType, functionUri(jsonNode), functionStates(jsonNode), maxRequestDuration(jsonNode), maxNumBatchRequests(jsonNode));
            case GRPC:
                return new GrpcFunctionSpec(functionType, functionAddress(jsonNode));
            default:
                throw new IllegalArgumentException("Unrecognized function kind " + textAt);
        }
    }

    private static int maxNumBatchRequests(JsonNode jsonNode) {
        return Selectors.optionalIntegerAt(jsonNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS).orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS.intValue());
    }

    private static Duration maxRequestDuration(JsonNode jsonNode) {
        return (Duration) Selectors.optionalTextAt(jsonNode, Pointers.Functions.FUNCTION_TIMEOUT).map(TimeUtils::parseDuration).orElse(DEFAULT_HTTP_TIMEOUT);
    }

    private static List<String> functionStates(JsonNode jsonNode) {
        return Selectors.textListAt(jsonNode, Pointers.Functions.FUNCTION_STATES);
    }

    private static FunctionType functionType(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, Pointers.Functions.META_TYPE));
        return new FunctionType(from.namespace(), from.name());
    }

    private static InetSocketAddress functionAddress(JsonNode jsonNode) {
        return new InetSocketAddress(Selectors.textAt(jsonNode, Pointers.Functions.FUNCTION_HOSTNAME), Selectors.integerAt(jsonNode, Pointers.Functions.FUNCTION_PORT));
    }

    private static URI functionUri(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, Pointers.Functions.FUNCTION_ENDPOINT);
        URI create = URI.create(textAt);
        String scheme = create.getScheme();
        if (scheme == null) {
            throw new IllegalArgumentException("Missing scheme in function endpoint " + textAt + "; an http or https scheme must be provided.");
        }
        if (scheme.equalsIgnoreCase(HttpHost.DEFAULT_SCHEME_NAME) || scheme.equalsIgnoreCase(SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM)) {
            return create;
        }
        throw new IllegalArgumentException("Missing scheme in function endpoint " + textAt + "; an http or https scheme must be provided.");
    }

    private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
        return Collectors.toMap((v0) -> {
            return v0.functionType();
        }, Function.identity());
    }
}
