package org.apache.pulsar.admin.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
import org.apache.pulsar.functions.shaded.proto.Function;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
/* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions.class */
public class CmdFunctions extends CmdBase {
    private static final Logger log = LoggerFactory.getLogger(CmdFunctions.class);
    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
    private final LocalRunner localRunner;
    private final CreateFunction creater;
    private final DeleteFunction deleter;
    private final UpdateFunction updater;
    private final GetFunction getter;
    private final GetFunctionStatus statuser;
    private final ListFunctions lister;
    private final StateGetter stateGetter;
    private final TriggerFunction triggerer;
    private final UploadFunction uploader;
    private final DownloadFunction downloader;

    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$BaseCommand.class */
    abstract class BaseCommand extends CliCommand {
        BaseCommand() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.pulsar.admin.cli.CliCommand
        public void run() throws Exception {
            processArguments();
            runCmd();
        }

        void processArguments() throws Exception {
        }

        abstract void runCmd() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Create a Pulsar Function in cluster mode (i.e. deploy it on a Pulsar cluster)")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$CreateFunction.class */
    public class CreateFunction extends FunctionDetailsCommand {
        CreateFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            CmdFunctions.checkRequiredFields(this.functionConfig);
            CmdFunctions.this.admin.functions().createFunction(convert(this.functionConfig), this.userCodeFile);
            print((CreateFunction) "Created successfully");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$DeleteFunction.class */
    public class DeleteFunction extends FunctionCommand {
        DeleteFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            CmdFunctions.this.admin.functions().deleteFunction(this.tenant, this.namespace, this.functionName);
            print((DeleteFunction) "Deleted successfully");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Download File Data from Pulsar", hidden = true)
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$DownloadFunction.class */
    public class DownloadFunction extends BaseCommand {

        @Parameter(names = {"--destinationFile"}, description = "The file where downloaded contents need to be stored", listConverter = StringConverter.class, required = true)
        protected String destinationFile;

        @Parameter(names = {"--path"}, description = "Path where the contents are to be stored", listConverter = StringConverter.class, required = true)
        protected String path;

        DownloadFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            CmdFunctions.this.admin.functions().downloadFunction(this.destinationFile, this.path);
            print((DownloadFunction) "Downloaded successfully");
        }
    }

    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$FunctionCommand.class */
    abstract class FunctionCommand extends BaseCommand {

        @Parameter(names = {"--fqfn"}, description = "The Fully Qualified Function Name (FQFN) for the function")
        protected String fqfn;

        @Parameter(names = {"--tenant"}, description = "The function's tenant")
        protected String tenant;

        @Parameter(names = {"--namespace"}, description = "The function's namespace")
        protected String namespace;

        @Parameter(names = {"--name"}, description = "The function's name")
        protected String functionName;

        FunctionCommand() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void processArguments() throws Exception {
            super.processArguments();
            boolean z = (null == this.tenant && null == this.namespace && null == this.functionName) ? false : true;
            boolean z2 = null != this.fqfn;
            if (z2 && z) {
                throw new RuntimeException("You must specify either a Fully Qualified Function Name (FQFN) or tenant, namespace, and function name");
            }
            if (!z2) {
                if (null == this.tenant || null == this.namespace || null == this.functionName) {
                    throw new RuntimeException("You must specify a tenant, namespace, and name for the function or a Fully Qualified Function Name (FQFN)");
                }
                return;
            }
            String[] split = this.fqfn.split("/");
            if (split.length != 3) {
                throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
            }
            this.tenant = split[0];
            this.namespace = split[1];
            this.functionName = split[2];
        }

        public String getFqfn() {
            return this.fqfn;
        }

        public String getTenant() {
            return this.tenant;
        }

        public String getNamespace() {
            return this.namespace;
        }

        public String getFunctionName() {
            return this.functionName;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$FunctionDetailsCommand.class */
    abstract class FunctionDetailsCommand extends BaseCommand {

        @Parameter(names = {"--fqfn"}, description = "The Fully Qualified Function Name (FQFN) for the function")
        protected String fqfn;

        @Parameter(names = {"--tenant"}, description = "The function's tenant")
        protected String tenant;

        @Parameter(names = {"--namespace"}, description = "The function's namespace")
        protected String namespace;

        @Parameter(names = {"--name"}, description = "The function's name")
        protected String functionName;

        @Parameter(names = {"--className"}, description = "The function's class name")
        protected String className;

        @Parameter(names = {"--jar"}, description = "Path to the jar file for the function (if the function is written in Java)", listConverter = StringConverter.class)
        protected String jarFile;

        @Parameter(names = {"--py"}, description = "Path to the main Python file for the function (if the function is written in Python)", listConverter = StringConverter.class)
        protected String pyFile;

        @Parameter(names = {"--inputs"}, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
        protected String inputs;

        @Parameter(names = {"--output"}, description = "The function's output topic")
        protected String output;

        @Parameter(names = {"--logTopic"}, description = "The topic to which the function's logs are produced")
        protected String logTopic;

        @Parameter(names = {"--customSerdeInputs"}, description = "The map of input topics to SerDe class names (as a JSON string)")
        protected String customSerdeInputString;

        @Parameter(names = {"--outputSerdeClassName"}, description = "The SerDe class to be used for messages output by the function")
        protected String outputSerdeClassName;

        @Parameter(names = {"--functionConfigFile"}, description = "The path to a YAML config file specifying the function's configuration")
        protected String fnConfigFile;

        @Parameter(names = {"--processingGuarantees"}, description = "The processing guarantees (aka delivery semantics) applied to the function")
        protected FunctionConfig.ProcessingGuarantees processingGuarantees;

        @Parameter(names = {"--subscriptionType"}, description = "The type of subscription used by the function when consuming messages from the input topic(s)")
        protected FunctionConfig.SubscriptionType subscriptionType;

        @Parameter(names = {"--userConfig"}, description = "User-defined config key/values")
        protected String userConfigString;

        @Parameter(names = {"--parallelism"}, description = "The function's parallelism factor (i.e. the number of function instances to run)")
        protected String parallelism;
        protected FunctionConfig functionConfig;
        protected String userCodeFile;

        FunctionDetailsCommand() {
            super();
        }

        /* JADX WARN: Type inference failed for: r0v75, types: [org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand$2] */
        /* JADX WARN: Type inference failed for: r0v95, types: [org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand$1] */
        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void processArguments() throws Exception {
            super.processArguments();
            if (null != this.fnConfigFile) {
                this.functionConfig = CmdFunctions.loadConfig(new File(this.fnConfigFile));
            } else {
                this.functionConfig = new FunctionConfig();
            }
            if (null != this.fqfn) {
                CmdFunctions.this.parseFullyQualifiedFunctionName(this.fqfn, this.functionConfig);
            } else {
                if (null != this.tenant) {
                    this.functionConfig.setTenant(this.tenant);
                }
                if (null != this.namespace) {
                    this.functionConfig.setNamespace(this.namespace);
                }
                if (null != this.functionName) {
                    this.functionConfig.setName(this.functionName);
                }
            }
            if (null != this.inputs) {
                List asList = Arrays.asList(this.inputs.split(","));
                asList.forEach(this::validateTopicName);
                this.functionConfig.setInputs(asList);
            }
            if (null != this.customSerdeInputString) {
                Map map = (Map) new Gson().fromJson(this.customSerdeInputString, new TypeToken<Map<String, String>>() { // from class: org.apache.pulsar.admin.cli.CmdFunctions.FunctionDetailsCommand.1
                }.getType());
                map.forEach((str, str2) -> {
                    validateTopicName(str);
                });
                this.functionConfig.setCustomSerdeInputs(map);
            }
            if (null != this.output) {
                validateTopicName(this.output);
                this.functionConfig.setOutput(this.output);
            }
            if (null != this.logTopic) {
                this.functionConfig.setLogTopic(this.logTopic);
            }
            if (null != this.className) {
                this.functionConfig.setClassName(this.className);
            }
            if (null != this.outputSerdeClassName) {
                this.functionConfig.setOutputSerdeClassName(this.outputSerdeClassName);
            }
            if (null != this.processingGuarantees) {
                this.functionConfig.setProcessingGuarantees(this.processingGuarantees);
            }
            if (null != this.subscriptionType) {
                this.functionConfig.setSubscriptionType(this.subscriptionType);
            }
            if (null != this.userConfigString) {
                this.functionConfig.setUserConfig((Map) new Gson().fromJson(this.userConfigString, new TypeToken<Map<String, String>>() { // from class: org.apache.pulsar.admin.cli.CmdFunctions.FunctionDetailsCommand.2
                }.getType()));
            }
            if (null != this.jarFile) {
                doJavaSubmitChecks(this.functionConfig);
                this.functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
                this.userCodeFile = this.jarFile;
            } else {
                if (null == this.pyFile) {
                    throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
                }
                doPythonSubmitChecks(this.functionConfig);
                this.functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
                this.userCodeFile = this.pyFile;
            }
            if (this.functionConfig.getInputs().isEmpty() && this.functionConfig.getCustomSerdeInputs().isEmpty()) {
                throw new RuntimeException("No input topic(s) specified for the function");
            }
            CmdFunctions.verifyNoTopicClash(this.functionConfig.getInputs(), this.functionConfig.getOutput());
            if (this.parallelism != null) {
                int parseInt = Integer.parseInt(this.parallelism);
                if (parseInt <= 0) {
                    throw new IllegalArgumentException("The parallelism factor (the number of instances) for the function must be positive");
                }
                this.functionConfig.setParallelism(parseInt);
            } else if (this.functionConfig.getParallelism() == 0) {
                this.functionConfig.setParallelism(1);
            }
            if (this.functionConfig.getSubscriptionType() != null && this.functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER && this.functionConfig.getProcessingGuarantees() != null && this.functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type");
            }
            this.functionConfig.setAutoAck(true);
            inferMissingArguments(this.functionConfig);
        }

        private void assertClassExistsInJar(File file) {
            if (!Reflections.classExistsInJar(file, this.functionConfig.getClassName())) {
                throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s", this.functionConfig.getClassName(), this.jarFile));
            }
            if (!Reflections.classInJarImplementsIface(file, this.functionConfig.getClassName(), Function.class) && !Reflections.classInJarImplementsIface(file, this.functionConfig.getClassName(), java.util.function.Function.class)) {
                throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function", this.functionConfig.getClassName(), this.jarFile));
            }
        }

        private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) {
            Class<?>[] resolveRawArguments;
            assertClassExistsInJar(file);
            Object createInstance = Reflections.createInstance(functionConfig.getClassName(), file);
            if (createInstance instanceof Function) {
                Function function = (Function) createInstance;
                if (function == null) {
                    throw new IllegalArgumentException(String.format("The Pulsar function class %s could not be instantiated from jar %s", functionConfig.getClassName(), this.jarFile));
                }
                resolveRawArguments = TypeResolver.resolveRawArguments(Function.class, function.getClass());
            } else {
                java.util.function.Function function2 = (java.util.function.Function) createInstance;
                if (function2 == null) {
                    throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s", functionConfig.getClassName(), this.jarFile));
                }
                resolveRawArguments = TypeResolver.resolveRawArguments(java.util.function.Function.class, function2.getClass());
            }
            return resolveRawArguments;
        }

        private void doJavaSubmitChecks(FunctionConfig functionConfig) {
            if (Objects.isNull(functionConfig.getClassName())) {
                throw new IllegalArgumentException("You supplied a jar file but no main class");
            }
            File file = new File(this.jarFile);
            try {
                ClassLoader loadJar = Reflections.loadJar(file);
                Class<?>[] functionTypes = getFunctionTypes(file, functionConfig);
                functionConfig.getCustomSerdeInputs().forEach((str, str2) -> {
                    if (!Reflections.classExists(str2) && !Reflections.classExistsInJar(new File(this.jarFile), str2)) {
                        throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not exist", str2));
                    }
                    if (Reflections.classExists(str2)) {
                        if (!Reflections.classImplementsIface(str2, SerDe.class)) {
                            throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s", str2, SerDe.class.getCanonicalName()));
                        }
                    } else if (Reflections.classExistsInJar(new File(this.jarFile), str2) && !Reflections.classInJarImplementsIface(new File(this.jarFile), str2, SerDe.class)) {
                        throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s", str2, SerDe.class.getCanonicalName()));
                    }
                    if (str2.equals(DefaultSerDe.class.getName())) {
                        if (!DefaultSerDe.IsSupportedType(functionTypes[0])) {
                            throw new RuntimeException("The default Serializer does not support type " + functionTypes[0]);
                        }
                        return;
                    }
                    SerDe serDe = (SerDe) Reflections.createInstance(str2, file);
                    if (serDe == null) {
                        throw new IllegalArgumentException(String.format("The SerDe class %s does not exist in jar %s", str2, this.jarFile));
                    }
                    Class[] resolveRawArguments = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
                    try {
                        if (!Class.forName(functionTypes[0].getName(), true, loadJar).isAssignableFrom(Class.forName(resolveRawArguments[0].getName(), true, loadJar))) {
                            throw new RuntimeException("Serializer type mismatch " + functionTypes[0] + " vs " + resolveRawArguments[0]);
                        }
                    } catch (ClassNotFoundException e) {
                        throw new RuntimeException("Failed to load type class", e);
                    }
                });
                functionConfig.getInputs().forEach(str3 -> {
                    if (!DefaultSerDe.IsSupportedType(functionTypes[0])) {
                        throw new RuntimeException("Default Serializer does not support type " + functionTypes[0]);
                    }
                });
                if (Void.class.equals(functionTypes[1])) {
                    return;
                }
                if (functionConfig.getOutputSerdeClassName() == null || functionConfig.getOutputSerdeClassName().isEmpty() || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
                    if (!DefaultSerDe.IsSupportedType(functionTypes[1])) {
                        throw new RuntimeException("Default Serializer does not support type " + functionTypes[1]);
                    }
                    return;
                }
                SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(), file);
                if (serDe == null) {
                    throw new IllegalArgumentException(String.format("SerDe class %s does not exist in jar %s", functionConfig.getOutputSerdeClassName(), this.jarFile));
                }
                Class[] resolveRawArguments = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
                try {
                    if (!Class.forName(resolveRawArguments[0].getName(), true, loadJar).isAssignableFrom(Class.forName(functionTypes[1].getName(), true, loadJar))) {
                        throw new RuntimeException("Serializer type mismatch " + functionTypes[1] + " vs " + resolveRawArguments[0]);
                    }
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException("Failed to load type class", e);
                }
            } catch (MalformedURLException e2) {
                throw new RuntimeException("Failed to load user jar " + file, e2);
            }
        }

        private void doPythonSubmitChecks(FunctionConfig functionConfig) {
            if (functionConfig.getClassName() == null) {
                throw new IllegalArgumentException("You specified a Python file but no main class name");
            }
            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
            }
        }

        private void validateTopicName(String str) {
            if (!TopicName.isValid(str)) {
                throw new IllegalArgumentException(String.format("The topic name %s is invalid", str));
            }
        }

        private void inferMissingArguments(FunctionConfig functionConfig) {
            if (StringUtils.isEmpty(functionConfig.getName())) {
                inferMissingFunctionName(functionConfig);
            }
            if (StringUtils.isEmpty(functionConfig.getTenant())) {
                inferMissingTenant(functionConfig);
            }
            if (StringUtils.isEmpty(functionConfig.getNamespace())) {
                inferMissingNamespace(functionConfig);
            }
            if (StringUtils.isEmpty(functionConfig.getOutput())) {
                inferMissingOutput(functionConfig);
            }
        }

        private void inferMissingFunctionName(FunctionConfig functionConfig) {
            String[] split = functionConfig.getClassName().split("\\.");
            if (split.length == 0) {
                functionConfig.setName(functionConfig.getClassName());
            } else {
                functionConfig.setName(split[split.length - 1]);
            }
        }

        private void inferMissingTenant(FunctionConfig functionConfig) {
            try {
                functionConfig.setTenant(TopicName.get(getUniqueInput(functionConfig)).getTenant());
            } catch (IllegalArgumentException e) {
                throw new RuntimeException("You need to specify a tenant for the function", e);
            }
        }

        private void inferMissingNamespace(FunctionConfig functionConfig) {
            try {
                functionConfig.setNamespace(TopicName.get(getUniqueInput(functionConfig)).getNamespacePortion());
            } catch (IllegalArgumentException e) {
                throw new RuntimeException("You need to specify a namespace for the function");
            }
        }

        private void inferMissingOutput(FunctionConfig functionConfig) {
            try {
                functionConfig.setOutput(String.format("%s-%s-output", getUniqueInput(functionConfig), functionConfig.getName()));
            } catch (IllegalArgumentException e) {
            }
        }

        private String getUniqueInput(FunctionConfig functionConfig) {
            if (functionConfig.getInputs().size() + functionConfig.getCustomSerdeInputs().size() != 1) {
                throw new IllegalArgumentException();
            }
            return functionConfig.getInputs().size() == 1 ? (String) functionConfig.getInputs().iterator().next() : (String) functionConfig.getCustomSerdeInputs().keySet().iterator().next();
        }

        protected Function.FunctionDetails convert(FunctionConfig functionConfig) throws IOException {
            Class<?>[] clsArr = null;
            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                File file = new File(this.jarFile);
                try {
                    Reflections.loadJar(file);
                    clsArr = getFunctionTypes(file, functionConfig);
                } catch (MalformedURLException e) {
                    throw new RuntimeException("Failed to load user jar " + file, e);
                }
            }
            Function.FunctionDetails.Builder newBuilder = Function.FunctionDetails.newBuilder();
            HashMap hashMap = new HashMap();
            hashMap.putAll(functionConfig.getCustomSerdeInputs());
            Function.SourceSpec.Builder newBuilder2 = Function.SourceSpec.newBuilder();
            functionConfig.getInputs().forEach(str -> {
            });
            newBuilder2.putAllTopicsToSerDeClassName(hashMap);
            if (functionConfig.getSubscriptionType() != null) {
                newBuilder2.setSubscriptionType(CmdFunctions.convertSubscriptionType(functionConfig.getSubscriptionType()));
            }
            if (clsArr != null) {
                newBuilder2.setTypeClassName(clsArr[0].getName());
            }
            newBuilder.setSource(newBuilder2);
            Function.SinkSpec.Builder newBuilder3 = Function.SinkSpec.newBuilder();
            if (functionConfig.getOutput() != null) {
                newBuilder3.setTopic(functionConfig.getOutput());
            }
            if (functionConfig.getOutputSerdeClassName() != null) {
                newBuilder3.setSerDeClassName(functionConfig.getOutputSerdeClassName());
            }
            if (clsArr != null) {
                newBuilder3.setTypeClassName(clsArr[1].getName());
            }
            newBuilder.setSink(newBuilder3);
            if (functionConfig.getTenant() != null) {
                newBuilder.setTenant(functionConfig.getTenant());
            }
            if (functionConfig.getNamespace() != null) {
                newBuilder.setNamespace(functionConfig.getNamespace());
            }
            if (functionConfig.getName() != null) {
                newBuilder.setName(functionConfig.getName());
            }
            if (functionConfig.getClassName() != null) {
                newBuilder.setClassName(functionConfig.getClassName());
            }
            if (functionConfig.getLogTopic() != null) {
                newBuilder.setLogTopic(functionConfig.getLogTopic());
            }
            if (functionConfig.getRuntime() != null) {
                newBuilder.setRuntime(CmdFunctions.convertRuntime(functionConfig.getRuntime()));
            }
            if (!functionConfig.getUserConfig().isEmpty()) {
                newBuilder.setUserConfig(new Gson().toJson(functionConfig.getUserConfig()));
            }
            if (functionConfig.getProcessingGuarantees() != null) {
                newBuilder.setProcessingGuarantees(CmdFunctions.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
            }
            newBuilder.setAutoAck(functionConfig.isAutoAck());
            newBuilder.setParallelism(functionConfig.getParallelism());
            return newBuilder.build();
        }

        protected Function.FunctionDetails convertProto2(FunctionConfig functionConfig) throws IOException {
            Function.FunctionDetails.Builder newBuilder = Function.FunctionDetails.newBuilder();
            Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), newBuilder);
            return newBuilder.build();
        }

        public String getFqfn() {
            return this.fqfn;
        }

        public String getTenant() {
            return this.tenant;
        }

        public String getNamespace() {
            return this.namespace;
        }

        public String getFunctionName() {
            return this.functionName;
        }

        public String getClassName() {
            return this.className;
        }

        public String getJarFile() {
            return this.jarFile;
        }

        public String getPyFile() {
            return this.pyFile;
        }

        public String getInputs() {
            return this.inputs;
        }

        public String getOutput() {
            return this.output;
        }

        public String getLogTopic() {
            return this.logTopic;
        }

        public String getCustomSerdeInputString() {
            return this.customSerdeInputString;
        }

        public String getOutputSerdeClassName() {
            return this.outputSerdeClassName;
        }

        public String getFnConfigFile() {
            return this.fnConfigFile;
        }

        public FunctionConfig.ProcessingGuarantees getProcessingGuarantees() {
            return this.processingGuarantees;
        }

        public FunctionConfig.SubscriptionType getSubscriptionType() {
            return this.subscriptionType;
        }

        public String getUserConfigString() {
            return this.userConfigString;
        }

        public String getParallelism() {
            return this.parallelism;
        }

        public FunctionConfig getFunctionConfig() {
            return this.functionConfig;
        }

        public String getUserCodeFile() {
            return this.userCodeFile;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Fetch information about a Pulsar Function")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$GetFunction.class */
    public class GetFunction extends FunctionCommand {
        GetFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            String printJson = Utils.printJson(CmdFunctions.this.admin.functions().getFunction(this.tenant, this.namespace, this.functionName));
            System.out.println(new GsonBuilder().setPrettyPrinting().create().toJson(new JsonParser().parse(printJson)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Check the current status of a Pulsar Function")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$GetFunctionStatus.class */
    public class GetFunctionStatus extends FunctionCommand {
        GetFunctionStatus() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            String printJson = Utils.printJson(CmdFunctions.this.admin.functions().getFunctionStatus(this.tenant, this.namespace, this.functionName));
            System.out.println(new GsonBuilder().setPrettyPrinting().create().toJson(new JsonParser().parse(printJson)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "List all of the Pulsar Functions running under a specific tenant and namespace")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$ListFunctions.class */
    public class ListFunctions extends NamespaceCommand {
        ListFunctions() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            print(CmdFunctions.this.admin.functions().getFunctions(this.tenant, this.namespace));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Run the Pulsar Function locally (rather than deploying it to the Pulsar cluster)")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$LocalRunner.class */
    public class LocalRunner extends FunctionDetailsCommand {

        @Parameter(names = {"--stateStorageServiceUrl"}, description = "The URL for the state storage service (by default Apache BookKeeper)")
        protected String stateStorageServiceUrl;

        @Parameter(names = {"--brokerServiceUrl"}, description = "The URL for the Pulsar broker")
        protected String brokerServiceUrl;

        LocalRunner() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            CmdFunctions.checkRequiredFields(this.functionConfig);
            String serviceUrl = CmdFunctions.this.admin.getServiceUrl();
            if (this.brokerServiceUrl != null) {
                serviceUrl = this.brokerServiceUrl;
            }
            if (serviceUrl == null) {
                serviceUrl = CmdFunctions.DEFAULT_SERVICE_URL;
            }
            ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory(serviceUrl, (String) null, (String) null, (String) null);
            Throwable th = null;
            try {
                try {
                    final LinkedList<RuntimeSpawner> linkedList = new LinkedList();
                    for (int i = 0; i < this.functionConfig.getParallelism(); i++) {
                        InstanceConfig instanceConfig = new InstanceConfig();
                        instanceConfig.setFunctionDetails(convertProto2(this.functionConfig));
                        instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
                        instanceConfig.setFunctionId(UUID.randomUUID().toString());
                        instanceConfig.setInstanceId(Integer.toString(i));
                        instanceConfig.setMaxBufferedTuples(1024);
                        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, this.userCodeFile, processRuntimeFactory, 0L);
                        linkedList.add(runtimeSpawner);
                        runtimeSpawner.start();
                    }
                    Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            CmdFunctions.log.info("Shutting down the localrun runtimeSpawner ...");
                            Iterator it = linkedList.iterator();
                            while (it.hasNext()) {
                                ((RuntimeSpawner) it.next()).close();
                            }
                        }
                    });
                    for (RuntimeSpawner runtimeSpawner2 : linkedList) {
                        runtimeSpawner2.join();
                        CmdFunctions.log.info("RuntimeSpawner quit because of {}", runtimeSpawner2.getRuntime().getDeathException());
                    }
                    if (processRuntimeFactory != null) {
                        if (0 == 0) {
                            processRuntimeFactory.close();
                            return;
                        }
                        try {
                            processRuntimeFactory.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (processRuntimeFactory != null) {
                    if (th != null) {
                        try {
                            processRuntimeFactory.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        processRuntimeFactory.close();
                    }
                }
                throw th4;
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$NamespaceCommand.class */
    abstract class NamespaceCommand extends BaseCommand {

        @Parameter(names = {"--tenant"}, description = "The function's tenant", required = true)
        protected String tenant;

        @Parameter(names = {"--namespace"}, description = "The function's namespace", required = true)
        protected String namespace;

        NamespaceCommand() {
            super();
        }

        public String getTenant() {
            return this.tenant;
        }

        public String getNamespace() {
            return this.namespace;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Fetch the current state associated with a Pulsar Function running in cluster mode")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$StateGetter.class */
    public class StateGetter extends FunctionCommand {

        @Parameter(names = {"-k", "--key"}, description = "key")
        private String key;

        @Parameter(names = {"-u", "--storage-service-url"}, description = "The URL for the storage service used by the function")
        private String stateStorageServiceUrl;

        @Parameter(names = {"-w", "--watch"}, description = "Watch for changes in the value associated with a key for a Pulsar Function")
        private boolean watch;

        StateGetter() {
            super();
            this.key = null;
            this.stateStorageServiceUrl = null;
            this.watch = false;
        }

        /* JADX WARN: Finally extract failed */
        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            Preconditions.checkNotNull(this.stateStorageServiceUrl, "The state storage service URL is missing");
            String format = String.format("%s_%s", this.tenant, this.namespace);
            String functionName = getFunctionName();
            StorageClient build = StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().addEndpoints(NetUtils.parseEndpoint(this.stateStorageServiceUrl)).clientName("functions-admin").build()).withNamespace(format).build();
            Throwable th = null;
            try {
                Table table = (Table) FutureUtils.result(build.openTable(functionName));
                Throwable th2 = null;
                long j = -1;
                do {
                    try {
                        KeyValue keyValue = (KeyValue) FutureUtils.result(table.getKv(Unpooled.wrappedBuffer(this.key.getBytes(StandardCharsets.UTF_8))));
                        Throwable th3 = null;
                        if (null == keyValue) {
                            try {
                                try {
                                    System.out.println("key '" + this.key + "' doesn't exist.");
                                } catch (Throwable th4) {
                                    if (keyValue != null) {
                                        if (th3 != null) {
                                            try {
                                                keyValue.close();
                                            } catch (Throwable th5) {
                                                th3.addSuppressed(th5);
                                            }
                                        } else {
                                            keyValue.close();
                                        }
                                    }
                                    throw th4;
                                }
                            } catch (Throwable th6) {
                                th3 = th6;
                                throw th6;
                            }
                        } else if (keyValue.version() > j) {
                            if (keyValue.isNumber()) {
                                System.out.println("value = " + keyValue.numberValue());
                            } else {
                                System.out.println("value = " + new String(ByteBufUtil.getBytes((ByteBuf) keyValue.value()), StandardCharsets.UTF_8));
                            }
                            j = keyValue.version();
                        }
                        if (keyValue != null) {
                            if (0 != 0) {
                                try {
                                    keyValue.close();
                                } catch (Throwable th7) {
                                    th3.addSuppressed(th7);
                                }
                            } else {
                                keyValue.close();
                            }
                        }
                        if (this.watch) {
                            Thread.sleep(1000L);
                        }
                    } catch (Throwable th8) {
                        if (table != null) {
                            if (0 != 0) {
                                try {
                                    table.close();
                                } catch (Throwable th9) {
                                    th2.addSuppressed(th9);
                                }
                            } else {
                                table.close();
                            }
                        }
                        throw th8;
                    }
                } while (this.watch);
                if (table != null) {
                    if (0 != 0) {
                        try {
                            table.close();
                        } catch (Throwable th10) {
                            th2.addSuppressed(th10);
                        }
                    } else {
                        table.close();
                    }
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th11) {
                        th.addSuppressed(th11);
                    }
                }
            } catch (Throwable th12) {
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th13) {
                            th.addSuppressed(th13);
                        }
                    } else {
                        build.close();
                    }
                }
                throw th12;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Triggers the specified Pulsar Function with a supplied value")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$TriggerFunction.class */
    public class TriggerFunction extends FunctionCommand {

        @Parameter(names = {"--triggerValue"}, description = "The value with which you want to trigger the function")
        protected String triggerValue;

        @Parameter(names = {"--triggerFile"}, description = "The path to the file that contains the data with which you'd like to trigger the function")
        protected String triggerFile;

        @Parameter(names = {"--topic"}, description = "The specific topic name that the function consumes from that you want to inject the data to")
        protected String topic;

        TriggerFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            if (this.triggerFile == null && this.triggerValue == null) {
                throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
            }
            System.out.println(CmdFunctions.this.admin.functions().triggerFunction(this.tenant, this.namespace, this.functionName, this.topic, this.triggerValue, this.triggerFile));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Update a Pulsar Function that's been deployed to a Pulsar cluster")
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$UpdateFunction.class */
    public class UpdateFunction extends FunctionDetailsCommand {
        UpdateFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            CmdFunctions.checkRequiredFields(this.functionConfig);
            CmdFunctions.this.admin.functions().updateFunction(convert(this.functionConfig), this.userCodeFile);
            print((UpdateFunction) "Updated successfully");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Upload File Data to Pulsar", hidden = true)
    /* loaded from: input_file:org/apache/pulsar/admin/cli/CmdFunctions$UploadFunction.class */
    public class UploadFunction extends BaseCommand {

        @Parameter(names = {"--sourceFile"}, description = "The file whose contents need to be uploaded", listConverter = StringConverter.class, required = true)
        protected String sourceFile;

        @Parameter(names = {"--path"}, description = "Path where the contents need to be stored", listConverter = StringConverter.class, required = true)
        protected String path;

        UploadFunction() {
            super();
        }

        @Override // org.apache.pulsar.admin.cli.CmdFunctions.BaseCommand
        void runCmd() throws Exception {
            CmdFunctions.this.admin.functions().uploadFunction(this.sourceFile, this.path);
            print((UploadFunction) "Uploaded successfully");
        }
    }

    public CmdFunctions(PulsarAdmin pulsarAdmin) throws PulsarClientException {
        super("functions", pulsarAdmin);
        this.localRunner = new LocalRunner();
        this.creater = new CreateFunction();
        this.deleter = new DeleteFunction();
        this.updater = new UpdateFunction();
        this.getter = new GetFunction();
        this.statuser = new GetFunctionStatus();
        this.lister = new ListFunctions();
        this.stateGetter = new StateGetter();
        this.triggerer = new TriggerFunction();
        this.uploader = new UploadFunction();
        this.downloader = new DownloadFunction();
        this.jcommander.addCommand("localrun", getLocalRunner());
        this.jcommander.addCommand("create", getCreater());
        this.jcommander.addCommand("delete", getDeleter());
        this.jcommander.addCommand("update", getUpdater());
        this.jcommander.addCommand("get", getGetter());
        this.jcommander.addCommand("getstatus", getStatuser());
        this.jcommander.addCommand("list", getLister());
        this.jcommander.addCommand("querystate", getStateGetter());
        this.jcommander.addCommand("trigger", getTriggerer());
        this.jcommander.addCommand("upload", getUploader());
        this.jcommander.addCommand("download", getDownloader());
    }

    @VisibleForTesting
    LocalRunner getLocalRunner() {
        return this.localRunner;
    }

    @VisibleForTesting
    CreateFunction getCreater() {
        return this.creater;
    }

    @VisibleForTesting
    DeleteFunction getDeleter() {
        return this.deleter;
    }

    @VisibleForTesting
    UpdateFunction getUpdater() {
        return this.updater;
    }

    @VisibleForTesting
    GetFunction getGetter() {
        return this.getter;
    }

    @VisibleForTesting
    GetFunctionStatus getStatuser() {
        return this.statuser;
    }

    @VisibleForTesting
    ListFunctions getLister() {
        return this.lister;
    }

    @VisibleForTesting
    StateGetter getStateGetter() {
        return this.stateGetter;
    }

    @VisibleForTesting
    TriggerFunction getTriggerer() {
        return this.triggerer;
    }

    @VisibleForTesting
    UploadFunction getUploader() {
        return this.uploader;
    }

    @VisibleForTesting
    DownloadFunction getDownloader() {
        return this.downloader;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static FunctionConfig loadConfig(File file) throws IOException {
        return (FunctionConfig) new ObjectMapper(new YAMLFactory()).readValue(file, FunctionConfig.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void verifyNoTopicClash(Collection<String> collection, String str) throws IllegalArgumentException {
        if (collection.contains(str)) {
            throw new IllegalArgumentException(String.format("Output topic %s is also being used as an input topic (topics must be one or the other)", str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void checkRequiredFields(FunctionConfig functionConfig) throws IllegalArgumentException {
        if (Objects.isNull(functionConfig.getTenant())) {
            throw new IllegalArgumentException("You must specify a tenant for the function");
        }
        if (Objects.isNull(functionConfig.getNamespace())) {
            throw new IllegalArgumentException("You must specify a namespace for the function");
        }
        if (Objects.isNull(functionConfig.getName())) {
            throw new IllegalArgumentException("You must specify a name for the function");
        }
        if (Objects.isNull(functionConfig.getClassName())) {
            throw new IllegalArgumentException("You must specify a class name for the function");
        }
        if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
            throw new IllegalArgumentException("You must specify one or more input topics for the function");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Function.FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) {
        for (Function.FunctionDetails.Runtime runtime2 : Function.FunctionDetails.Runtime.values()) {
            if (runtime2.name().equals(runtime.name())) {
                return runtime2;
            }
        }
        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Function.SubscriptionType convertSubscriptionType(FunctionConfig.SubscriptionType subscriptionType) {
        for (Function.SubscriptionType subscriptionType2 : Function.SubscriptionType.values()) {
            if (subscriptionType2.name().equals(subscriptionType.name())) {
                return subscriptionType2;
            }
        }
        throw new RuntimeException("Unrecognized subscription type: " + subscriptionType.name());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Function.ProcessingGuarantees convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees processingGuarantees) {
        for (Function.ProcessingGuarantees processingGuarantees2 : Function.ProcessingGuarantees.values()) {
            if (processingGuarantees2.name().equals(processingGuarantees.name())) {
                return processingGuarantees2;
            }
        }
        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void parseFullyQualifiedFunctionName(String str, FunctionConfig functionConfig) {
        String[] split = str.split("/");
        if (split.length != 3) {
            throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
        }
        functionConfig.setTenant(split[0]);
        functionConfig.setNamespace(split[1]);
        functionConfig.setName(split[2]);
    }
}
