/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.utils.io;

import java.io.File;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConnectorUtils {
    private static final Logger log = LoggerFactory.getLogger(ConnectorUtils.class);
    private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";

    public static String getIOSourceClass(NarClassLoader ncl) throws IOException {
        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
        ConnectorDefinition conf = (ConnectorDefinition)ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
        if (StringUtils.isEmpty((CharSequence)conf.getSourceClass())) {
            throw new IOException(String.format("The '%s' connector does not provide a source implementation", conf.getName()));
        }
        try {
            Class sourceClass = ncl.loadClass(conf.getSourceClass());
            if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
                throw new IOException(String.format("Class %s does not implement interface %s or %s", conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
            }
        }
        catch (Throwable t) {
            Exceptions.rethrowIOException(t);
        }
        return conf.getSourceClass();
    }

    public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
        ConnectorDefinition conf = ConnectorUtils.getConnectorDefinition(classLoader);
        NarClassLoader ncl = (NarClassLoader)classLoader;
        if (StringUtils.isEmpty((CharSequence)conf.getSinkClass())) {
            throw new IOException(String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
        }
        try {
            Class sinkClass = ncl.loadClass(conf.getSinkClass());
            if (!Sink.class.isAssignableFrom(sinkClass)) {
                throw new IOException("Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
            }
        }
        catch (Throwable t) {
            Exceptions.rethrowIOException(t);
        }
        return conf.getSinkClass();
    }

    public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
        try (NarClassLoader ncl = NarClassLoader.getFromArchive((File)new File(narPath), Collections.emptySet(), (String)narExtractionDirectory);){
            ConnectorDefinition connectorDefinition = ConnectorUtils.getConnectorDefinition((ClassLoader)ncl);
            return connectorDefinition;
        }
    }

    public static ConnectorDefinition getConnectorDefinition(ClassLoader classLoader) throws IOException {
        NarClassLoader narClassLoader = (NarClassLoader)classLoader;
        String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
        return (ConnectorDefinition)ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
    }

    public static List<ConfigFieldDefinition> getConnectorConfigDefinition(String narPath, String configClassName, String narExtractionDirectory) throws Exception {
        LinkedList<ConfigFieldDefinition> retval = new LinkedList<ConfigFieldDefinition>();
        try (NarClassLoader ncl = NarClassLoader.getFromArchive((File)new File(narPath), Collections.emptySet(), (String)narExtractionDirectory);){
            Class configClass = ncl.loadClass(configClassName);
            for (Field field : Reflections.getAllFields((Class)configClass)) {
                if (Modifier.isStatic(field.getModifiers())) continue;
                field.setAccessible(true);
                ConfigFieldDefinition configFieldDefinition = new ConfigFieldDefinition();
                configFieldDefinition.setFieldName(field.getName());
                configFieldDefinition.setTypeName(field.getType().getName());
                HashMap<String, String> attributes = new HashMap<String, String>();
                for (Annotation annotation : field.getAnnotations()) {
                    if (!annotation.annotationType().equals(FieldDoc.class)) continue;
                    FieldDoc fieldDoc = (FieldDoc)annotation;
                    for (Method method : FieldDoc.class.getDeclaredMethods()) {
                        Object value = method.invoke((Object)fieldDoc, new Object[0]);
                        attributes.put(method.getName(), value == null ? "" : value.toString());
                    }
                }
                configFieldDefinition.setAttributes(attributes);
                retval.add(configFieldDefinition);
            }
        }
        return retval;
    }

    public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
        Path path = Paths.get(connectorsDirectory, new String[0]).toAbsolutePath();
        log.info("Searching for connectors in {}", (Object)path);
        Connectors connectors = new Connectors();
        if (!path.toFile().exists()) {
            log.warn("Connectors archive directory not found");
            return connectors;
        }
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar");){
            for (Path archive : stream) {
                try {
                    ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString(), narExtractionDirectory);
                    log.info("Found connector {} from {}", (Object)cntDef, (Object)archive);
                    if (!StringUtils.isEmpty((CharSequence)cntDef.getSourceClass())) {
                        connectors.sources.put(cntDef.getName(), archive);
                        if (!StringUtils.isEmpty((CharSequence)cntDef.getSourceConfigClass())) {
                            connectors.sourceConfigDefinitions.put(cntDef.getName(), ConnectorUtils.getConnectorConfigDefinition(archive.toString(), cntDef.getSourceConfigClass(), narExtractionDirectory));
                        }
                    }
                    if (!StringUtils.isEmpty((CharSequence)cntDef.getSinkClass())) {
                        connectors.sinks.put(cntDef.getName(), archive);
                        if (!StringUtils.isEmpty((CharSequence)cntDef.getSinkConfigClass())) {
                            connectors.sinkConfigDefinitions.put(cntDef.getName(), ConnectorUtils.getConnectorConfigDefinition(archive.toString(), cntDef.getSinkConfigClass(), narExtractionDirectory));
                        }
                    }
                    connectors.connectors.add(cntDef);
                }
                catch (Throwable t) {
                    log.warn("Failed to load connector from {}", (Object)archive, (Object)t);
                }
            }
        }
        Collections.sort(connectors.connectors, (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));
        return connectors;
    }

    private ConnectorUtils() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }
}

