package org.apache.flink.statefun.flink.core.jsonmodule;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.util.TimeUtils;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.class */
public final class FunctionEndpointJsonEntity implements JsonEntity {
    private static final JsonPointer FUNCTION_ENDPOINTS_POINTER = JsonPointer.compile("/endpoints");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity$MetaPointers.class */
    public static final class MetaPointers {
        private static final JsonPointer KIND = JsonPointer.compile("/endpoint/meta/kind");

        private MetaPointers() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity$SpecPointers.class */
    public static final class SpecPointers {
        private static final JsonPointer TARGET_FUNCTIONS = JsonPointer.compile("/endpoint/spec/functions");
        private static final JsonPointer URL_PATH_TEMPLATE = JsonPointer.compile("/endpoint/spec/urlPathTemplate");
        private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
        private static final JsonPointer MAX_NUM_BATCH_REQUESTS = JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");

        private SpecPointers() {
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity$TimeoutPointers.class */
    private static final class TimeoutPointers {
        private static final JsonPointer CALL = JsonPointer.compile("/call");
        private static final JsonPointer CONNECT = JsonPointer.compile("/connect");
        private static final JsonPointer READ = JsonPointer.compile("/read");
        private static final JsonPointer WRITE = JsonPointer.compile("/write");

        private TimeoutPointers() {
        }
    }

    @Override // org.apache.flink.statefun.flink.core.jsonmodule.JsonEntity
    public void bind(StatefulFunctionModule.Binder binder, JsonNode jsonNode, FormatVersion formatVersion) {
        if (formatVersion != FormatVersion.v3_0) {
            throw new IllegalArgumentException("endpoints is only supported with format version 3.0.");
        }
        for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry : parseFunctionEndpointSpecs(functionEndpointSpecNodes(jsonNode)).entrySet()) {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            entry.getValue().forEach(functionEndpointSpec -> {
                FunctionEndpointSpec.Target target = functionEndpointSpec.target();
                if (target.isSpecificFunctionType()) {
                    hashMap.put(target.asSpecificFunctionType(), functionEndpointSpec);
                } else {
                    hashMap2.put(target.asNamespace(), functionEndpointSpec);
                }
            });
            StatefulFunctionProvider functionProvider = functionProvider(entry.getKey(), hashMap, hashMap2);
            hashMap.keySet().forEach(functionType -> {
                binder.bindFunctionProvider(functionType, functionProvider);
            });
            hashMap2.keySet().forEach(functionTypeNamespaceMatcher -> {
                binder.bindFunctionProvider(functionTypeNamespaceMatcher, functionProvider);
            });
        }
    }

    private static Iterable<? extends JsonNode> functionEndpointSpecNodes(JsonNode jsonNode) {
        return Selectors.listAt(jsonNode, FUNCTION_ENDPOINTS_POINTER);
    }

    private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> parseFunctionEndpointSpecs(Iterable<? extends JsonNode> iterable) {
        return (Map) StreamSupport.stream(iterable.spliterator(), false).map(FunctionEndpointJsonEntity::parseFunctionEndpointsSpec).collect(Collectors.groupingBy((v0) -> {
            return v0.kind();
        }, Collectors.toList()));
    }

    private static FunctionEndpointSpec parseFunctionEndpointsSpec(JsonNode jsonNode) {
        FunctionEndpointSpec.Kind endpointKind = endpointKind(jsonNode);
        switch (endpointKind) {
            case HTTP:
                HttpFunctionEndpointSpec.Builder builder = HttpFunctionEndpointSpec.builder(target(jsonNode), urlPathTemplate(jsonNode));
                JsonNode at = jsonNode.at(SpecPointers.TIMEOUTS);
                OptionalInt optionalMaxNumBatchRequests = optionalMaxNumBatchRequests(jsonNode);
                builder.getClass();
                optionalMaxNumBatchRequests.ifPresent(builder::withMaxNumBatchRequests);
                Optional<Duration> optionalTimeoutDuration = optionalTimeoutDuration(at, TimeoutPointers.CALL);
                builder.getClass();
                optionalTimeoutDuration.ifPresent(builder::withMaxRequestDuration);
                Optional<Duration> optionalTimeoutDuration2 = optionalTimeoutDuration(at, TimeoutPointers.CONNECT);
                builder.getClass();
                optionalTimeoutDuration2.ifPresent(builder::withConnectTimeoutDuration);
                Optional<Duration> optionalTimeoutDuration3 = optionalTimeoutDuration(at, TimeoutPointers.READ);
                builder.getClass();
                optionalTimeoutDuration3.ifPresent(builder::withReadTimeoutDuration);
                Optional<Duration> optionalTimeoutDuration4 = optionalTimeoutDuration(at, TimeoutPointers.WRITE);
                builder.getClass();
                optionalTimeoutDuration4.ifPresent(builder::withWriteTimeoutDuration);
                return builder.build();
            case GRPC:
                throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
            default:
                throw new IllegalArgumentException("Unrecognized function endpoint kind " + endpointKind);
        }
    }

    private static FunctionEndpointSpec.Kind endpointKind(JsonNode jsonNode) {
        return FunctionEndpointSpec.Kind.valueOf(Selectors.textAt(jsonNode, MetaPointers.KIND).toUpperCase(Locale.getDefault()));
    }

    private static FunctionEndpointSpec.Target target(JsonNode jsonNode) {
        TypeName parseFrom = TypeName.parseFrom(Selectors.textAt(jsonNode, SpecPointers.TARGET_FUNCTIONS));
        if (parseFrom.namespace().contains("*")) {
            throw new IllegalArgumentException("Invalid syntax for " + SpecPointers.TARGET_FUNCTIONS + ". Only <namespace>/<name> or <namespace>/* are supported.");
        }
        if (parseFrom.name().equals("*")) {
            return FunctionEndpointSpec.Target.namespace(parseFrom.namespace());
        }
        if (parseFrom.name().contains("*")) {
            throw new IllegalArgumentException("Invalid syntax for " + SpecPointers.TARGET_FUNCTIONS + ". Only <namespace>/<name> or <namespace>/* are supported.");
        }
        return FunctionEndpointSpec.Target.functionType(new FunctionType(parseFrom.namespace(), parseFrom.name()));
    }

    private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(JsonNode jsonNode) {
        return new FunctionEndpointSpec.UrlPathTemplate(Selectors.textAt(jsonNode, SpecPointers.URL_PATH_TEMPLATE));
    }

    private static OptionalInt optionalMaxNumBatchRequests(JsonNode jsonNode) {
        return Selectors.optionalIntegerAt(jsonNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
    }

    private static Optional<Duration> optionalTimeoutDuration(JsonNode jsonNode, JsonPointer jsonPointer) {
        return Selectors.optionalTextAt(jsonNode, jsonPointer).map(TimeUtils::parseDuration);
    }

    private static StatefulFunctionProvider functionProvider(FunctionEndpointSpec.Kind kind, Map<FunctionType, FunctionEndpointSpec> map, Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> map2) {
        switch (kind) {
            case HTTP:
                return new HttpFunctionProvider(castValues(map), castValues(namespaceAsKey(map2)));
            case GRPC:
                throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
            default:
                throw new IllegalStateException("Unexpected kind: " + kind);
        }
    }

    private static <K, NV extends FunctionEndpointSpec> Map<K, NV> castValues(Map<K, FunctionEndpointSpec> map) {
        return new HashMap(map);
    }

    private static Map<String, FunctionEndpointSpec> namespaceAsKey(Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> map) {
        HashMap hashMap = new HashMap(map.size());
        map.forEach((functionTypeNamespaceMatcher, functionEndpointSpec) -> {
        });
        return hashMap;
    }
}
