/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.utils;

import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Function;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionCommon {
    private static final Logger log = LoggerFactory.getLogger(FunctionCommon.class);

    public static String printJson(MessageOrBuilder msg) throws IOException {
        return JsonFormat.printer().print(msg);
    }

    public static void mergeJson(String json, AbstractMessage.Builder builder) throws IOException {
        JsonFormat.parser().merge(json, (Message.Builder)builder);
    }

    public static int findAvailablePort() {
        try {
            ServerSocket socket = new ServerSocket(0);
            int port = socket.getLocalPort();
            socket.close();
            return port;
        }
        catch (IOException ex) {
            throw new RuntimeException("No free port found", ex);
        }
    }

    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader) throws ClassNotFoundException {
        boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
        Class<?> functionClass = classLoader.loadClass(functionConfig.getClassName());
        return FunctionCommon.getFunctionTypes(functionClass, isWindowConfigPresent);
    }

    public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfigPresent) {
        Class[] typeArgs;
        if (isWindowConfigPresent) {
            if (WindowFunction.class.isAssignableFrom(userClass)) {
                typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, (Class)userClass);
            } else {
                typeArgs = TypeResolver.resolveRawArguments(Function.class, (Class)userClass);
                if (!typeArgs[0].equals(Collection.class)) {
                    throw new IllegalArgumentException("Window function must take a collection as input");
                }
                Type type = TypeResolver.resolveGenericType(Function.class, (Type)userClass);
                Type collectionType = ((ParameterizedType)type).getActualTypeArguments()[0];
                Type actualInputType = ((ParameterizedType)collectionType).getActualTypeArguments()[0];
                typeArgs[0] = (Class)actualInputType;
            }
        } else {
            typeArgs = org.apache.pulsar.functions.api.Function.class.isAssignableFrom(userClass) ? TypeResolver.resolveRawArguments(org.apache.pulsar.functions.api.Function.class, (Class)userClass) : TypeResolver.resolveRawArguments(Function.class, (Class)userClass);
        }
        return typeArgs;
    }

    public static Object createInstance(String userClassName, ClassLoader classLoader) {
        Object result;
        Class<?> theCls;
        try {
            theCls = Class.forName(userClassName);
        }
        catch (ClassNotFoundException | NoClassDefFoundError cnfe) {
            try {
                theCls = Class.forName(userClassName, true, classLoader);
            }
            catch (ClassNotFoundException | NoClassDefFoundError e) {
                throw new RuntimeException("User class must be in class path", cnfe);
            }
        }
        try {
            Constructor<?> meth = theCls.getDeclaredConstructor(new Class[0]);
            meth.setAccessible(true);
            result = meth.newInstance(new Object[0]);
        }
        catch (InstantiationException ie) {
            throw new RuntimeException("User class must be concrete", ie);
        }
        catch (NoSuchMethodException e) {
            throw new RuntimeException("User class doesn't have such method", e);
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException("User class must have a no-arg constructor", e);
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException("User class constructor throws exception", e);
        }
        return result;
    }

    public static Function.FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) {
        for (Function.FunctionDetails.Runtime type : Function.FunctionDetails.Runtime.values()) {
            if (!type.name().equals(runtime.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
    }

    public static FunctionConfig.Runtime convertRuntime(Function.FunctionDetails.Runtime runtime) {
        for (FunctionConfig.Runtime type : FunctionConfig.Runtime.values()) {
            if (!type.name().equals(runtime.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
    }

    public static Function.ProcessingGuarantees convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees processingGuarantees) {
        for (Function.ProcessingGuarantees type : Function.ProcessingGuarantees.values()) {
            if (!type.name().equals(processingGuarantees.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
    }

    public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(Function.ProcessingGuarantees processingGuarantees) {
        for (FunctionConfig.ProcessingGuarantees type : FunctionConfig.ProcessingGuarantees.values()) {
            if (!type.name().equals(processingGuarantees.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
    }

    public static Class<?> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
        return FunctionCommon.getSourceType(classLoader.loadClass(className));
    }

    public static Class<?> getSourceType(Class sourceClass) {
        if (Source.class.isAssignableFrom(sourceClass)) {
            return TypeResolver.resolveRawArgument(Source.class, (Class)sourceClass);
        }
        if (BatchSource.class.isAssignableFrom(sourceClass)) {
            return TypeResolver.resolveRawArgument(BatchSource.class, (Class)sourceClass);
        }
        throw new IllegalArgumentException(String.format("Source class %s does not implement the correct interface", sourceClass.getName()));
    }

    public static Class<?> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
        Class<?> userClass = classLoader.loadClass(className);
        Class typeArg = TypeResolver.resolveRawArgument(Sink.class, userClass);
        return typeArg;
    }

    public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
        URL website = new URL(destPkgUrl);
        ReadableByteChannel rbc = Channels.newChannel(website.openStream());
        log.info("Downloading function package from {} to {} ...", (Object)destPkgUrl, (Object)targetFile.getAbsoluteFile());
        try (FileOutputStream fos = new FileOutputStream(targetFile);){
            fos.getChannel().transferFrom(rbc, 0L, Long.MAX_VALUE);
        }
        log.info("Downloading function package from {} to {} completed!", (Object)destPkgUrl, (Object)targetFile.getAbsoluteFile());
    }

    public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
        File file = FunctionCommon.extractFileFromPkgURL(destPkgUrl);
        try {
            return ClassLoaderUtils.loadJar((File)file);
        }
        catch (MalformedURLException e) {
            throw new IllegalArgumentException("Corrupt User PackageFile " + file + " with error " + e.getMessage());
        }
    }

    public static File createPkgTempFile() throws IOException {
        return File.createTempFile("functions", ".tmp");
    }

    public static File extractFileFromPkgURL(String destPkgUrl) throws IOException, URISyntaxException {
        if (destPkgUrl.startsWith("file")) {
            URL url = new URL(destPkgUrl);
            File file = new File(url.toURI());
            if (!file.exists()) {
                throw new IOException(destPkgUrl + " does not exists locally");
            }
            return file;
        }
        if (destPkgUrl.startsWith("http")) {
            File tempFile = FunctionCommon.createPkgTempFile();
            tempFile.deleteOnExit();
            FunctionCommon.downloadFromHttpUrl(destPkgUrl, tempFile);
            return tempFile;
        }
        throw new IllegalArgumentException("Unsupported url protocol " + destPkgUrl + ", supported url protocols: [file/http/https]");
    }

    public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile, String narExtractionDirectory) {
        if (archivePath != null) {
            try {
                return NarClassLoader.getFromArchive((File)archivePath.toFile(), Collections.emptySet(), (String)narExtractionDirectory);
            }
            catch (IOException e) {
                throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
            }
        }
        if (packageFile != null) {
            try {
                return NarClassLoader.getFromArchive((File)packageFile, Collections.emptySet(), (String)narExtractionDirectory);
            }
            catch (IOException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }
        return null;
    }

    public static String getFullyQualifiedInstanceId(Function.Instance instance) {
        return FunctionCommon.getFullyQualifiedInstanceId(instance.getFunctionMetaData().getFunctionDetails().getTenant(), instance.getFunctionMetaData().getFunctionDetails().getNamespace(), instance.getFunctionMetaData().getFunctionDetails().getName(), instance.getInstanceId());
    }

    public static String getFullyQualifiedInstanceId(String tenant, String namespace, String functionName, int instanceId) {
        return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId);
    }

    public static final long getSequenceId(MessageId messageId) {
        MessageIdImpl msgId = (MessageIdImpl)(messageId instanceof TopicMessageIdImpl ? ((TopicMessageIdImpl)messageId).getInnerMessageId() : messageId);
        long ledgerId = msgId.getLedgerId();
        long entryId = msgId.getEntryId();
        long offset = ledgerId << 28 | entryId;
        return offset;
    }

    public static final MessageId getMessageId(long sequenceId) {
        long ledgerId = sequenceId >>> 28;
        long entryId = sequenceId & 0xFFFFFFFL;
        return new MessageIdImpl(ledgerId, entryId, -1);
    }

    public static byte[] toByteArray(Object obj) throws IOException {
        byte[] bytes = null;
        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
             ObjectOutputStream oos = new ObjectOutputStream(bos);){
            oos.writeObject(obj);
            oos.flush();
            bytes = bos.toByteArray();
        }
        return bytes;
    }

    public static String getUniquePackageName(String packageName) {
        return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
    }

    public static String getStateNamespace(String tenant, String namespace) {
        return String.format("%s_%s", tenant, namespace).replace("-", "_");
    }

    public static String getFullyQualifiedName(Function.FunctionDetails FunctionDetails2) {
        return FunctionCommon.getFullyQualifiedName(FunctionDetails2.getTenant(), FunctionDetails2.getNamespace(), FunctionDetails2.getName());
    }

    public static String getFullyQualifiedName(String tenant, String namespace, String functionName) {
        return String.format("%s/%s/%s", tenant, namespace, functionName);
    }

    public static String extractTenantFromFullyQualifiedName(String fqfn) {
        return FunctionCommon.extractFromFullyQualifiedName(fqfn, 0);
    }

    public static String extractNamespaceFromFullyQualifiedName(String fqfn) {
        return FunctionCommon.extractFromFullyQualifiedName(fqfn, 1);
    }

    public static String extractNameFromFullyQualifiedName(String fqfn) {
        return FunctionCommon.extractFromFullyQualifiedName(fqfn, 2);
    }

    private static String extractFromFullyQualifiedName(String fqfn, int index) {
        String[] parts = fqfn.split("/");
        if (parts.length >= 3) {
            return parts[index];
        }
        throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
    }

    public static Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader) throws ClassNotFoundException {
        Class<?> loadedClass = classLoader.loadClass(className);
        if (!funClass.isAssignableFrom(loadedClass)) {
            throw new IllegalArgumentException(String.format("class %s is not type of %s", className, funClass.getName()));
        }
        return TypeResolver.resolveRawArgument(funClass, loadedClass);
    }

    public static double roundDecimal(double value, int places) {
        double scale = Math.pow(10.0, places);
        return (double)Math.round(value * scale) / scale;
    }

    private FunctionCommon() {
    }
}

