package org.apache.flink.statefun.flink.core.jsonmodule;

import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.kinesis.shaded.org.apache.http.HttpHost;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.common.Maps;
import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.util.TimeUtils;
import org.apache.kafka.common.config.SslConfigs;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.class */
final class FunctionJsonEntity implements JsonEntity {
    private static final JsonPointer FUNCTION_SPECS_POINTER = JsonPointer.compile("/functions");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity$MetaPointers.class */
    public static final class MetaPointers {
        private static final JsonPointer KIND = JsonPointer.compile("/function/meta/kind");
        private static final JsonPointer TYPE = JsonPointer.compile("/function/meta/type");

        private MetaPointers() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity$SpecPointers.class */
    public static final class SpecPointers {
        private static final JsonPointer HOSTNAME = JsonPointer.compile("/function/spec/host");
        private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
        private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
        private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
        private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
        private static final JsonPointer MAX_NUM_BATCH_REQUESTS = JsonPointer.compile("/function/spec/maxNumBatchRequests");

        private SpecPointers() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity$StateSpecPointers.class */
    public static final class StateSpecPointers {
        private static final JsonPointer NAME = JsonPointer.compile("/name");
        private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");

        private StateSpecPointers() {
        }
    }

    @Override // org.apache.flink.statefun.flink.core.jsonmodule.JsonEntity
    public void bind(StatefulFunctionModule.Binder binder, JsonNode jsonNode, FormatVersion formatVersion) {
        for (Map.Entry<FunctionSpec.Kind, Map<FunctionType, FunctionSpec>> entry : parse(functionSpecNodes(jsonNode), formatVersion).entrySet()) {
            StatefulFunctionProvider functionProvider = functionProvider(entry.getKey(), entry.getValue());
            Iterator<FunctionType> it2 = entry.getValue().keySet().iterator();
            while (it2.hasNext()) {
                binder.bindFunctionProvider(it2.next(), functionProvider);
            }
        }
    }

    private Map<FunctionSpec.Kind, Map<FunctionType, FunctionSpec>> parse(Iterable<? extends JsonNode> iterable, FormatVersion formatVersion) {
        return (Map) StreamSupport.stream(iterable.spliterator(), false).map(jsonNode -> {
            return parseFunctionSpec(jsonNode, formatVersion);
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.kind();
        }, groupByFunctionType()));
    }

    private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode jsonNode) {
        return Selectors.listAt(jsonNode, FUNCTION_SPECS_POINTER);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static FunctionSpec parseFunctionSpec(JsonNode jsonNode, FormatVersion formatVersion) {
        String textAt = Selectors.textAt(jsonNode, MetaPointers.KIND);
        FunctionSpec.Kind valueOf = FunctionSpec.Kind.valueOf(textAt.toUpperCase(Locale.getDefault()));
        FunctionType functionType = functionType(jsonNode);
        switch (valueOf) {
            case HTTP:
                HttpFunctionSpec.Builder builder = HttpFunctionSpec.builder(functionType, functionUri(jsonNode));
                Iterator<StateSpec> it2 = functionStateParserOf(formatVersion).apply(jsonNode).iterator();
                while (it2.hasNext()) {
                    builder.withState(it2.next());
                }
                OptionalInt optionalMaxNumBatchRequests = optionalMaxNumBatchRequests(jsonNode);
                builder.getClass();
                optionalMaxNumBatchRequests.ifPresent(builder::withMaxNumBatchRequests);
                Optional<Duration> optionalMaxRequestDuration = optionalMaxRequestDuration(jsonNode);
                builder.getClass();
                optionalMaxRequestDuration.ifPresent(builder::withMaxRequestDuration);
                return builder.build();
            case GRPC:
                return new GrpcFunctionSpec(functionType, functionAddress(jsonNode));
            default:
                throw new IllegalArgumentException("Unrecognized function kind " + textAt);
        }
    }

    private static Function<JsonNode, List<StateSpec>> functionStateParserOf(FormatVersion formatVersion) {
        switch (formatVersion) {
            case v1_0:
                return FunctionJsonEntity::functionStateSpecParserV1;
            case v2_0:
                return FunctionJsonEntity::functionStateSpecParserV2;
            default:
                throw new IllegalStateException("Unrecognized format version: " + formatVersion);
        }
    }

    private static List<StateSpec> functionStateSpecParserV1(JsonNode jsonNode) {
        return (List) Selectors.textListAt(jsonNode, SpecPointers.STATES).stream().map(StateSpec::new).collect(Collectors.toList());
    }

    private static List<StateSpec> functionStateSpecParserV2(JsonNode jsonNode) {
        Iterable<? extends JsonNode> listAt = Selectors.listAt(jsonNode, SpecPointers.STATES);
        ArrayList arrayList = new ArrayList();
        listAt.forEach(jsonNode2 -> {
            String textAt = Selectors.textAt(jsonNode2, StateSpecPointers.NAME);
            Optional<Duration> optionalStateExpireDuration = optionalStateExpireDuration(jsonNode2);
            if (optionalStateExpireDuration.isPresent()) {
                arrayList.add(new StateSpec(textAt, optionalStateExpireDuration.get()));
            } else {
                arrayList.add(new StateSpec(textAt));
            }
        });
        return arrayList;
    }

    private static OptionalInt optionalMaxNumBatchRequests(JsonNode jsonNode) {
        return Selectors.optionalIntegerAt(jsonNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
    }

    private static Optional<Duration> optionalMaxRequestDuration(JsonNode jsonNode) {
        return Selectors.optionalTextAt(jsonNode, SpecPointers.TIMEOUT).map(TimeUtils::parseDuration);
    }

    private static Optional<Duration> optionalStateExpireDuration(JsonNode jsonNode) {
        return Selectors.optionalTextAt(jsonNode, StateSpecPointers.EXPIRE_DURATION).map(TimeUtils::parseDuration);
    }

    private static FunctionType functionType(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, MetaPointers.TYPE));
        return new FunctionType(from.namespace(), from.name());
    }

    private static InetSocketAddress functionAddress(JsonNode jsonNode) {
        return new InetSocketAddress(Selectors.textAt(jsonNode, SpecPointers.HOSTNAME), Selectors.integerAt(jsonNode, SpecPointers.PORT));
    }

    private static URI functionUri(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, SpecPointers.ENDPOINT);
        URI create = URI.create(textAt);
        String scheme = create.getScheme();
        if (scheme == null) {
            throw new IllegalArgumentException("Missing scheme in function endpoint " + textAt + "; an http or https scheme must be provided.");
        }
        if (scheme.equalsIgnoreCase(HttpHost.DEFAULT_SCHEME_NAME) || scheme.equalsIgnoreCase(SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM) || scheme.equalsIgnoreCase("http+unix") || scheme.equalsIgnoreCase("https+unix")) {
            return create;
        }
        throw new IllegalArgumentException("Missing scheme in function endpoint " + textAt + "; an http or https or http+unix or https+unix scheme must be provided.");
    }

    private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
        return Collectors.toMap((v0) -> {
            return v0.functionType();
        }, Function.identity());
    }

    private static StatefulFunctionProvider functionProvider(FunctionSpec.Kind kind, Map<FunctionType, FunctionSpec> map) {
        switch (kind) {
            case HTTP:
                Class<HttpFunctionSpec> cls = HttpFunctionSpec.class;
                HttpFunctionSpec.class.getClass();
                return new HttpFunctionProvider(Maps.transformValues(map, (v1) -> {
                    return r3.cast(v1);
                }));
            case GRPC:
                Class<GrpcFunctionSpec> cls2 = GrpcFunctionSpec.class;
                GrpcFunctionSpec.class.getClass();
                return new GrpcFunctionProvider(Maps.transformValues(map, (v1) -> {
                    return r3.cast(v1);
                }));
            default:
                throw new IllegalStateException("Unexpected value: " + kind);
        }
    }
}
