package org.apache.flink.statefun.flink.core.jsonmodule;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import java.io.IOException;
import java.net.URL;
import java.util.Optional;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.ResourceLocator;
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.class */
final class RouterJsonEntity implements JsonEntity {
    private static final JsonPointer ROUTER_SPECS_POINTER = JsonPointer.compile("/routers");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity$MetaPointers.class */
    public static final class MetaPointers {
        private static final JsonPointer TYPE = JsonPointer.compile("/router/meta/type");

        private MetaPointers() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity$SpecPointers.class */
    public static final class SpecPointers {
        private static final JsonPointer INGRESS = JsonPointer.compile("/router/spec/ingress");
        private static final JsonPointer TARGET = JsonPointer.compile("/router/spec/target");
        private static final JsonPointer DESCRIPTOR = JsonPointer.compile("/router/spec/descriptorSet");
        private static final JsonPointer MESSAGE_TYPE = JsonPointer.compile("/router/spec/messageType");

        private SpecPointers() {
        }
    }

    @Override // org.apache.flink.statefun.flink.core.jsonmodule.JsonEntity
    public void bind(StatefulFunctionModule.Binder binder, JsonNode jsonNode, FormatVersion formatVersion) {
        Selectors.listAt(jsonNode, ROUTER_SPECS_POINTER).forEach(jsonNode2 -> {
            requireProtobufRouterType(jsonNode2);
            binder.bindIngressRouter(targetRouterIngress(jsonNode2), dynamicRouter(jsonNode2));
        });
    }

    private static Router<Message> dynamicRouter(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, SpecPointers.TARGET);
        String textAt2 = Selectors.textAt(jsonNode, SpecPointers.DESCRIPTOR);
        String textAt3 = Selectors.textAt(jsonNode, SpecPointers.MESSAGE_TYPE);
        Optional descriptorByName = protobufDescriptorMap(textAt2).getDescriptorByName(textAt3);
        if (descriptorByName.isPresent()) {
            return ProtobufRouter.forAddressTemplate((Descriptors.Descriptor) descriptorByName.get(), textAt);
        }
        throw new IllegalStateException("Error while processing a router definition. Unable to locate a message " + textAt3 + " in a descriptor set " + textAt2);
    }

    private static ProtobufDescriptorMap protobufDescriptorMap(String str) {
        try {
            URL findNamedResource = ResourceLocator.findNamedResource(str);
            if (findNamedResource == null) {
                throw new IllegalArgumentException("Unable to locate a Protobuf descriptor set at " + str);
            }
            return ProtobufDescriptorMap.from(findNamedResource);
        } catch (IOException e) {
            throw new IllegalStateException("Error while processing a router definition. Unable to read the descriptor set at  " + str, e);
        }
    }

    private static IngressIdentifier<Message> targetRouterIngress(JsonNode jsonNode) {
        NamespaceNamePair from = NamespaceNamePair.from(Selectors.textAt(jsonNode, SpecPointers.INGRESS));
        return new IngressIdentifier<>(Message.class, from.namespace(), from.name());
    }

    private static void requireProtobufRouterType(JsonNode jsonNode) {
        String textAt = Selectors.textAt(jsonNode, MetaPointers.TYPE);
        if (!textAt.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
            throw new IllegalStateException("Invalid router type " + textAt);
        }
    }
}
