package org.apache.storm;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.storm.dependency.DependencyPropertiesParser;
import org.apache.storm.dependency.DependencyUploader;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.hooks.SubmitterHookException;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.shade.net.minidev.json.JSONValue;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.BufferFileInputStream;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedInvalidTopologyException;
import org.apache.storm.validation.ConfigValidation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/StormSubmitter.class */
public class StormSubmitter {
    public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
    public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");
    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;

    /* loaded from: input_file:org/apache/storm/StormSubmitter$ProgressListener.class */
    public interface ProgressListener {
        void onStart(String str, String str2, long j);

        void onProgress(String str, String str2, long j, long j2);

        void onCompleted(String str, String str2, long j);
    }

    private static String generateZookeeperDigestSecretPayload() {
        long secureRandomLong = Utils.secureRandomLong();
        Utils.secureRandomLong();
        return secureRandomLong + ":" + secureRandomLong;
    }

    public static boolean validateZKDigestPayload(String str) {
        if (str != null) {
            return zkDigestPattern.matcher(str).matches();
        }
        return false;
    }

    public static Map<String, Object> prepareZookeeperAuthentication(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        String str = (String) map.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
        if (!map.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) || map.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null || !validateZKDigestPayload((String) map.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
            str = generateZookeeperDigestSecretPayload();
            LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + str);
        }
        hashMap.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, str);
        hashMap.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
        return hashMap;
    }

    private static Map<String, String> populateCredentials(Map<String, Object> map, Map<String, String> map2) {
        HashMap hashMap = new HashMap();
        for (IAutoCredentials iAutoCredentials : ClientAuthUtils.getAutoCredentials(map)) {
            LOG.info("Running " + iAutoCredentials);
            iAutoCredentials.populateCredentials(hashMap);
        }
        if (map2 != null) {
            hashMap.putAll(map2);
        }
        return hashMap;
    }

    public static boolean pushCredentials(String str, Map<String, Object> map, Map<String, String> map2) throws AuthorizationException, NotAliveException, InvalidTopologyException {
        return pushCredentials(str, map, map2, null);
    }

    public static boolean pushCredentials(String str, Map<String, Object> map, Map<String, String> map2, String str2) throws AuthorizationException, NotAliveException, InvalidTopologyException {
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(Utils.readCommandLineOpts());
        Map<String, Object> readStormConfig = Utils.readStormConfig();
        readStormConfig.putAll(hashMap);
        Map<String, String> populateCredentials = populateCredentials(readStormConfig, map2);
        if (populateCredentials.isEmpty()) {
            LOG.warn("No credentials were found to push to " + str);
            return false;
        }
        try {
            NimbusClient configuredClient = NimbusClient.getConfiguredClient(readStormConfig);
            try {
                LOG.info("Uploading new credentials to {}", str);
                Credentials credentials = new Credentials(populateCredentials);
                if (str2 != null) {
                    credentials.set_topoOwner(str2);
                }
                configuredClient.getClient().uploadNewCredentials(str, credentials);
                if (configuredClient != null) {
                    configuredClient.close();
                }
                LOG.info("Finished pushing creds to topology: {}", str);
                return true;
            } finally {
            }
        } catch (TException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public static void submitTopology(String str, Map<String, Object> map, StormTopology stormTopology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopology(str, map, stormTopology, null, null);
    }

    public static void submitTopology(String str, Map<String, Object> map, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopology(str, map, stormTopology, submitOptions, null);
    }

    public static void submitTopology(String str, Map<String, Object> map, StormTopology stormTopology, SubmitOptions submitOptions, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopologyAs(str, map, stormTopology, submitOptions, progressListener, null);
    }

    public static void submitTopologyAs(String str, Map<String, Object> map, StormTopology stormTopology, SubmitOptions submitOptions, ProgressListener progressListener, String str2) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
        Credentials credentials;
        Utils.validateTopologyName(str);
        if (!Utils.isValidConf(map)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        if (stormTopology.get_spouts_size() == 0) {
            throw new WrappedInvalidTopologyException("Topology " + str + " does not have any spout");
        }
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(Utils.readCommandLineOpts());
        Map<String, Object> readStormConfig = Utils.readStormConfig();
        readStormConfig.putAll(hashMap);
        hashMap.putAll(prepareZookeeperAuthentication(readStormConfig));
        validateConfs(readStormConfig);
        try {
            Utils.validateCycleFree(stormTopology, str);
        } catch (InvalidTopologyException e) {
            LOG.warn("", e);
        }
        Map<String, String> hashMap2 = new HashMap();
        if (submitOptions != null && (credentials = submitOptions.get_creds()) != null) {
            hashMap2 = credentials.get_creds();
        }
        Map<String, String> populateCredentials = populateCredentials(readStormConfig, hashMap2);
        if (!populateCredentials.isEmpty()) {
            if (submitOptions == null) {
                submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
            }
            submitOptions.set_creds(new Credentials(populateCredentials));
        }
        try {
            String jSONString = JSONValue.toJSONString(hashMap);
            NimbusClient configuredClientAs = NimbusClient.getConfiguredClientAs(readStormConfig, str2);
            try {
                if (!isTopologyNameAllowed(str, configuredClientAs)) {
                    throw new RuntimeException("Topology name " + str + " is either not allowed or it already exists on the cluster");
                }
                List<String> emptyList = Collections.emptyList();
                DependencyUploader dependencyUploader = new DependencyUploader();
                try {
                    dependencyUploader.init();
                    emptyList = uploadDependencyJarsToBlobStore(dependencyUploader);
                    try {
                        try {
                            setDependencyBlobsToTopology(stormTopology, emptyList, uploadDependencyArtifactsToBlobStore(dependencyUploader));
                            submitTopologyInDistributeMode(str, stormTopology, submitOptions, progressListener, str2, readStormConfig, jSONString, configuredClientAs);
                            dependencyUploader.shutdown();
                            if (configuredClientAs != null) {
                                configuredClientAs.close();
                            }
                            invokeSubmitterHook(str, str2, readStormConfig, stormTopology);
                        } catch (Throwable th) {
                            dependencyUploader.shutdown();
                            throw th;
                        }
                    } catch (AlreadyAliveException | AuthorizationException | InvalidTopologyException e2) {
                        dependencyUploader.deleteBlobs(emptyList);
                        throw e2;
                    }
                } catch (Throwable th2) {
                    dependencyUploader.deleteBlobs(emptyList);
                    dependencyUploader.shutdown();
                    throw th2;
                }
            } finally {
            }
        } catch (TException e3) {
            throw new RuntimeException((Throwable) e3);
        }
    }

    private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader dependencyUploader) {
        LOG.info("Uploading dependencies - jars...");
        try {
            return dependencyUploader.uploadFiles(new DependencyPropertiesParser().parseJarsProperties(System.getProperty("storm.dependency.jars", "")), true);
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader dependencyUploader) {
        LOG.info("Uploading dependencies - artifacts...");
        try {
            return dependencyUploader.uploadArtifacts(new DependencyPropertiesParser().parseArtifactsProperties(System.getProperty("storm.dependency.artifacts", "{}")));
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private static void setDependencyBlobsToTopology(StormTopology stormTopology, List<String> list, List<String> list2) {
        LOG.info("Dependency Blob keys - jars : {} / artifacts : {}", list, list2);
        stormTopology.set_dependency_jars(list);
        stormTopology.set_dependency_artifacts(list2);
    }

    /* JADX WARN: Type inference failed for: r15v0, types: [java.lang.Throwable, org.apache.storm.generated.InvalidTopologyException] */
    private static void submitTopologyInDistributeMode(String str, StormTopology stormTopology, SubmitOptions submitOptions, ProgressListener progressListener, String str2, Map<String, Object> map, String str3, NimbusClient nimbusClient) throws TException {
        try {
            String submitJarAs = submitJarAs(map, System.getProperty("storm.jar"), progressListener, nimbusClient);
            LOG.info("Submitting topology {} in distributed mode with conf {}", str, str3);
            Utils.addVersions(stormTopology);
            if (submitOptions != null) {
                nimbusClient.getClient().submitTopologyWithOpts(str, submitJarAs, str3, stormTopology, submitOptions);
            } else {
                nimbusClient.getClient().submitTopology(str, submitJarAs, str3, stormTopology);
            }
            LOG.info("Finished submitting topology: {}", str);
        } catch (AlreadyAliveException e) {
            LOG.error("Topology already alive exception", e);
            throw e;
        } catch (InvalidTopologyException e2) {
            LOG.error("Topology submission exception: {}", e2.get_msg());
            throw e2;
        }
    }

    private static void invokeSubmitterHook(String str, String str2, Map<String, Object> map, StormTopology stormTopology) {
        try {
            if (map.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
                String obj = map.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString();
                LOG.info("Initializing the registered ISubmitterHook [{}]", obj);
                if (obj == null || obj.isEmpty()) {
                    throw new IllegalArgumentException("storm.topology.submission.notifier.plugin.class property must be a non empty string.");
                }
                ISubmitterHook iSubmitterHook = (ISubmitterHook) Class.forName(obj).newInstance();
                TopologyInfo topologyInfo = Utils.getTopologyInfo(str, str2, map);
                LOG.info("Invoking the registered ISubmitterHook [{}]", obj);
                iSubmitterHook.notify(topologyInfo, map, stormTopology);
            }
        } catch (Exception e) {
            LOG.warn("Error occurred in invoking submitter hook:[{}] ", (Object) null, e);
            throw new SubmitterHookException(e);
        }
    }

    public static void submitTopologyWithProgressBar(String str, Map<String, Object> map, StormTopology stormTopology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopologyWithProgressBar(str, map, stormTopology, null);
    }

    public static void submitTopologyWithProgressBar(String str, Map<String, Object> map, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopology(str, map, stormTopology, submitOptions, new ProgressListener() { // from class: org.apache.storm.StormSubmitter.1
            @Override // org.apache.storm.StormSubmitter.ProgressListener
            public void onStart(String str2, String str3, long j) {
                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", str2, str3, Long.valueOf(j));
            }

            @Override // org.apache.storm.StormSubmitter.ProgressListener
            public void onProgress(String str2, String str3, long j, long j2) {
                int i = (int) ((50 * j) / j2);
                System.out.printf("\r[%s%s] %d / %d", StringUtils.repeat("=", i), StringUtils.repeat(" ", 50 - i), Long.valueOf(j), Long.valueOf(j2));
            }

            @Override // org.apache.storm.StormSubmitter.ProgressListener
            public void onCompleted(String str2, String str3, long j) {
                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", str2, str3, Long.valueOf(j));
            }
        });
    }

    private static boolean isTopologyNameAllowed(String str, NimbusClient nimbusClient) {
        try {
            return nimbusClient.getClient().isTopologyNameAllowed(str);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static String submitJar(Map<String, Object> map, String str) {
        return submitJar(map, str, null);
    }

    public static String submitJar(Map<String, Object> map, String str, ProgressListener progressListener) {
        return submitJarAs(map, str, progressListener, (String) null);
    }

    public static String submitJarAs(Map<String, Object> map, String str, ProgressListener progressListener, NimbusClient nimbusClient) {
        if (str == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        try {
            String beginFileUpload = nimbusClient.getClient().beginFileUpload();
            LOG.info("Uploading topology jar " + str + " to assigned location: " + beginFileUpload);
            BufferFileInputStream bufferFileInputStream = new BufferFileInputStream(str, THRIFT_CHUNK_SIZE_BYTES);
            long length = new File(str).length();
            if (progressListener != null) {
                progressListener.onStart(str, beginFileUpload, length);
            }
            long j = 0;
            while (true) {
                byte[] read = bufferFileInputStream.read();
                j += read.length;
                if (progressListener != null) {
                    progressListener.onProgress(str, beginFileUpload, j, length);
                }
                if (read.length == 0) {
                    break;
                }
                nimbusClient.getClient().uploadChunk(beginFileUpload, ByteBuffer.wrap(read));
            }
            nimbusClient.getClient().finishFileUpload(beginFileUpload);
            if (progressListener != null) {
                progressListener.onCompleted(str, beginFileUpload, length);
            }
            LOG.info("Successfully uploaded topology jar to assigned location: " + beginFileUpload);
            return beginFileUpload;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static String submitJarAs(Map<String, Object> map, String str, ProgressListener progressListener, String str2) {
        if (str == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        NimbusClient configuredClientAs = NimbusClient.getConfiguredClientAs(map, str2);
        try {
            String submitJarAs = submitJarAs(map, str, progressListener, configuredClientAs);
            if (configuredClientAs != null) {
                configuredClientAs.close();
            }
            return submitJarAs;
        } catch (Throwable th) {
            if (configuredClientAs != null) {
                try {
                    configuredClientAs.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void validateConfs(Map<String, Object> map) throws IllegalArgumentException, InvalidTopologyException, AuthorizationException {
        ConfigValidation.validateTopoConf(map);
        Utils.validateTopologyBlobStoreMap(map);
    }
}
