package org.apache.storm.localizer;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.storm.Config;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.utils.ShellUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/localizer/Localizer.class */
public class Localizer {
    public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
    private Map _conf;
    private int _threadPoolSize;
    private ExecutorService _execService;
    private ExecutorService _updateExecService;
    private int _blobDownloadRetries;
    private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new ConcurrentHashMap();
    private String _localBaseDir;
    public static final String USERCACHE = "usercache";
    public static final String FILECACHE = "filecache";
    public static final String FILESDIR = "files";
    public static final String ARCHIVESDIR = "archives";
    private static final String TO_UNCOMPRESS = "_tmp_";
    private long _cacheTargetSize;
    private long _cacheCleanupPeriod;
    private ScheduledExecutorService _cacheCleanupService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/localizer/Localizer$DownloadBlob.class */
    public static class DownloadBlob implements Callable<LocalizedResource> {
        private Localizer _localizer;
        private Map _conf;
        private String _key;
        private File _localFile;
        private String _user;
        private boolean _uncompress;
        private boolean _isUpdate;

        public DownloadBlob(Localizer localizer, Map map, String str, File file, String str2, boolean z, boolean z2) {
            this._localizer = localizer;
            this._conf = map;
            this._key = str;
            this._localFile = file;
            this._user = str2;
            this._uncompress = z;
            this._isUpdate = z2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public LocalizedResource call() throws AuthorizationException, KeyNotFoundException, IOException {
            return this._localizer.downloadBlob(this._conf, this._key, this._localFile, this._user, this._uncompress, this._isUpdate);
        }
    }

    public Localizer(Map map, String str) {
        this._conf = map;
        this._localBaseDir = str;
        this._cacheTargetSize = Utils.getInt(this._conf.get(Config.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB), 10240).longValue() << 20;
        this._cacheCleanupPeriod = Utils.getInt(this._conf.get(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 600000).longValue();
        this._threadPoolSize = Utils.getInt(this._conf.get(Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5).intValue();
        this._blobDownloadRetries = Utils.getInt(this._conf.get(Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3).intValue();
        this._execService = Executors.newFixedThreadPool(this._threadPoolSize);
        this._updateExecService = Executors.newFixedThreadPool(this._threadPoolSize);
        reconstructLocalizedResources();
    }

    protected void setTargetCacheSize(long j) {
        this._cacheTargetSize = j;
    }

    ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
        return this._userRsrc;
    }

    public void startCleaner() {
        this._cacheCleanupService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("Localizer Cache Cleanup").build());
        this._cacheCleanupService.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.storm.localizer.Localizer.1
            @Override // java.lang.Runnable
            public void run() {
                Localizer.this.handleCacheCleanup();
            }
        }, this._cacheCleanupPeriod, this._cacheCleanupPeriod, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        if (this._cacheCleanupService != null) {
            this._cacheCleanupService.shutdown();
        }
        if (this._execService != null) {
            this._execService.shutdown();
        }
        if (this._updateExecService != null) {
            this._updateExecService.shutdown();
        }
    }

    protected File getUserCacheDir() {
        return new File(this._localBaseDir, USERCACHE);
    }

    protected File getLocalUserDir(String str) {
        return new File(getUserCacheDir(), str);
    }

    public File getLocalUserFileCacheDir(String str) {
        return new File(getLocalUserDir(str), FILECACHE);
    }

    protected File getCacheDirForFiles(File file) {
        return new File(file, FILESDIR);
    }

    protected File getCacheDirForArchives(File file) {
        return new File(file, ARCHIVESDIR);
    }

    protected void addLocalizedResourceInDir(String str, LocalizedResourceSet localizedResourceSet, boolean z) {
        File[] readCurrentBlobs = readCurrentBlobs(str);
        if (readCurrentBlobs != null) {
            for (File file : readCurrentBlobs) {
                LOG.info("add localized in dir found: " + file);
                String path = file.getPath();
                int lastIndexOf = path.lastIndexOf(46);
                if (lastIndexOf > 0) {
                    path = path.substring(0, lastIndexOf);
                }
                LOG.debug("local file is: {} path is: {}", file.getPath(), path);
                LocalizedResource localizedResource = new LocalizedResource(new File(path).getName(), path, z);
                localizedResourceSet.addResource(localizedResource.getKey(), localizedResource, z);
            }
        }
    }

    protected File[] readDirContents(String str) {
        File file = new File(str);
        File[] fileArr = null;
        if (file.exists()) {
            fileArr = file.listFiles();
        }
        return fileArr;
    }

    protected File[] readCurrentBlobs(String str) {
        File file = new File(str);
        File[] fileArr = null;
        if (file.exists()) {
            fileArr = file.listFiles(new FilenameFilter() { // from class: org.apache.storm.localizer.Localizer.2
                @Override // java.io.FilenameFilter
                public boolean accept(File file2, String str2) {
                    return str2.toLowerCase().endsWith(Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
                }
            });
        }
        return fileArr;
    }

    protected void reconstructLocalizedResources() {
        try {
            LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
            File[] readDirContents = readDirContents(getUserCacheDir().getPath());
            if (readDirContents != null) {
                for (File file : readDirContents) {
                    String name = file.getName();
                    LOG.debug("looking in: {} for user: {}", file.getPath(), name);
                    LocalizedResourceSet localizedResourceSet = new LocalizedResourceSet(name);
                    LocalizedResourceSet putIfAbsent = this._userRsrc.putIfAbsent(name, localizedResourceSet);
                    if (putIfAbsent == null) {
                        putIfAbsent = localizedResourceSet;
                    }
                    addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(name)).getPath(), putIfAbsent, false);
                    addLocalizedResourceInDir(getCacheDirForArchives(getLocalUserFileCacheDir(name)).getPath(), putIfAbsent, true);
                }
            } else {
                LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
            }
        } catch (Exception e) {
            LOG.error("ERROR reconstructing localized resources", e);
        }
    }

    public synchronized void removeBlobReference(String str, String str2, String str3, boolean z) throws AuthorizationException, KeyNotFoundException {
        LocalizedResourceSet localizedResourceSet = this._userRsrc.get(str2);
        if (localizedResourceSet == null) {
            LOG.warn("trying to remove blob for non-existent resource set for user: " + str2 + " key: " + str + " topo: " + str3);
            return;
        }
        LocalizedResource localizedResource = localizedResourceSet.get(str, z);
        if (localizedResource == null) {
            LOG.warn("trying to remove non-existent blob, key: " + str + " for user: " + str2 + " topo: " + str3);
        } else {
            LOG.debug("removing blob reference to: {} for topo: {}", str, str3);
            localizedResource.removeReference(str3);
        }
    }

    public synchronized void addReferences(List<LocalResource> list, String str, String str2) {
        LocalizedResourceSet localizedResourceSet = this._userRsrc.get(str);
        if (localizedResourceSet == null) {
            LOG.warn("trying to add reference to non-existent local resource set, user: " + str + " topo: " + str2);
            return;
        }
        for (LocalResource localResource : list) {
            LocalizedResource localizedResource = localizedResourceSet.get(localResource.getBlobName(), localResource.shouldUncompress());
            if (localizedResource != null) {
                localizedResource.addReference(str2);
                LOG.debug("added reference for topo: {} key: {}", str2, localResource);
            } else {
                LOG.warn("trying to add reference to non-existent blob, key: " + localResource + " topo: " + str2);
            }
        }
    }

    public LocalizedResource getBlob(LocalResource localResource, String str, String str2, File file) throws AuthorizationException, KeyNotFoundException, IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(localResource);
        List<LocalizedResource> blobs = getBlobs(arrayList, str, str2, file);
        if (blobs.isEmpty() || blobs.size() != 1) {
            throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + str + ", topo: " + str2);
        }
        return blobs.get(0);
    }

    protected boolean isLocalizedResourceDownloaded(LocalizedResource localizedResource) {
        return new File(localizedResource.getFilePathWithVersion()).exists() && new File(localizedResource.getCurrentSymlinkPath()).exists() && new File(localizedResource.getVersionFilePath()).exists();
    }

    protected boolean isLocalizedResourceUpToDate(LocalizedResource localizedResource, ClientBlobStore clientBlobStore) throws AuthorizationException, KeyNotFoundException {
        return Utils.nimbusVersionOfBlob(localizedResource.getKey(), clientBlobStore) == Utils.localVersionOfBlob(localizedResource.getFilePath());
    }

    protected ClientBlobStore getClientBlobStore() {
        return Utils.getClientBlobStoreForSupervisor(this._conf);
    }

    public List<LocalizedResource> updateBlobs(List<LocalResource> list, String str) throws AuthorizationException, KeyNotFoundException, IOException {
        LocalizedResourceSet localizedResourceSet = this._userRsrc.get(str);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        if (localizedResourceSet == null) {
            return arrayList;
        }
        ClientBlobStore clientBlobStore = null;
        try {
            clientBlobStore = getClientBlobStore();
            for (LocalResource localResource : list) {
                String blobName = localResource.getBlobName();
                LocalizedResource localizedResource = localizedResourceSet.get(blobName, localResource.shouldUncompress());
                if (localizedResource == null) {
                    LOG.warn("blob requested for update doesn't exist: {}", blobName);
                } else if (!isLocalizedResourceUpToDate(localizedResource, clientBlobStore) || !isLocalizedResourceDownloaded(localizedResource)) {
                    LOG.debug("updating blob: {}", blobName);
                    arrayList2.add(new DownloadBlob(this, this._conf, blobName, new File(localizedResource.getFilePath()), str, localizedResource.isUncompressed(), true));
                }
            }
            if (clientBlobStore != null) {
                clientBlobStore.shutdown();
            }
            try {
                Iterator it = this._updateExecService.invokeAll(arrayList2).iterator();
                while (it.hasNext()) {
                    try {
                        LocalizedResource localizedResource2 = (LocalizedResource) ((Future) it.next()).get();
                        LocalizedResourceSet localizedResourceSet2 = new LocalizedResourceSet(str);
                        LocalizedResourceSet putIfAbsent = this._userRsrc.putIfAbsent(str, localizedResourceSet2);
                        if (putIfAbsent == null) {
                            putIfAbsent = localizedResourceSet2;
                        }
                        putIfAbsent.updateResource(localizedResource2.getKey(), localizedResource2, localizedResource2.isUncompressed());
                        arrayList.add(localizedResource2);
                    } catch (ExecutionException e) {
                        LOG.error("Error updating blob: ", e);
                        if (e.getCause() instanceof AuthorizationException) {
                            throw ((AuthorizationException) e.getCause());
                        }
                        if (e.getCause() instanceof KeyNotFoundException) {
                            throw ((KeyNotFoundException) e.getCause());
                        }
                    }
                }
            } catch (InterruptedException e2) {
                throw new IOException("Interrupted Exception", e2);
            } catch (RejectedExecutionException e3) {
                LOG.error("Error updating blobs : ", e3);
            }
            return arrayList;
        } catch (Throwable th) {
            if (clientBlobStore != null) {
                clientBlobStore.shutdown();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    public synchronized List<LocalizedResource> getBlobs(List<LocalResource> list, String str, String str2, File file) throws AuthorizationException, KeyNotFoundException, IOException {
        LocalizedResourceSet localizedResourceSet = new LocalizedResourceSet(str);
        LocalizedResourceSet putIfAbsent = this._userRsrc.putIfAbsent(str, localizedResourceSet);
        if (putIfAbsent == null) {
            putIfAbsent = localizedResourceSet;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ClientBlobStore clientBlobStore = null;
        try {
            clientBlobStore = getClientBlobStore();
            for (LocalResource localResource : list) {
                String blobName = localResource.getBlobName();
                boolean shouldUncompress = localResource.shouldUncompress();
                LocalizedResource localizedResource = putIfAbsent.get(blobName, localResource.shouldUncompress());
                boolean z = false;
                if (localizedResource != null && localizedResource.isUncompressed() == localResource.shouldUncompress() && isLocalizedResourceDownloaded(localizedResource)) {
                    if (isLocalizedResourceUpToDate(localizedResource, clientBlobStore)) {
                        LOG.debug("blob already exists: {}", blobName);
                        localizedResource.addReference(str2);
                        arrayList.add(localizedResource);
                    } else {
                        LOG.debug("blob exists but isn't up to date: {}", blobName);
                        z = true;
                    }
                }
                LOG.debug("fetching blob: {}", blobName);
                File cacheDirForFiles = getCacheDirForFiles(file);
                File file2 = new File(cacheDirForFiles, blobName);
                if (shouldUncompress) {
                    cacheDirForFiles = getCacheDirForArchives(file);
                    file2 = new File(cacheDirForFiles, blobName);
                }
                cacheDirForFiles.mkdir();
                arrayList2.add(new DownloadBlob(this, this._conf, blobName, file2, str, shouldUncompress, z));
            }
            if (clientBlobStore != null) {
                clientBlobStore.shutdown();
            }
            try {
                Iterator it = this._execService.invokeAll(arrayList2).iterator();
                while (it.hasNext()) {
                    LocalizedResource localizedResource2 = (LocalizedResource) ((Future) it.next()).get();
                    localizedResource2.addReference(str2);
                    putIfAbsent.addResource(localizedResource2.getKey(), localizedResource2, localizedResource2.isUncompressed());
                    arrayList.add(localizedResource2);
                }
                return arrayList;
            } catch (InterruptedException e) {
                throw new IOException("Interrupted Exception", e);
            } catch (ExecutionException e2) {
                if (e2.getCause() instanceof AuthorizationException) {
                    throw ((AuthorizationException) e2.getCause());
                }
                if (e2.getCause() instanceof KeyNotFoundException) {
                    throw ((KeyNotFoundException) e2.getCause());
                }
                throw new IOException("Error getting blobs", e2);
            } catch (RejectedExecutionException e3) {
                throw new IOException("RejectedExecutionException: ", e3);
            }
        } catch (Throwable th) {
            if (clientBlobStore != null) {
                clientBlobStore.shutdown();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x0375, code lost:
    
        r0 = new org.apache.storm.localizer.LocalizedResource(r11, r0, r14);
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x0385, code lost:
    
        if (r16 == null) goto L66;
     */
    /* JADX WARN: Code restructure failed: missing block: B:40:0x0388, code lost:
    
        r16.shutdown();
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x038f, code lost:
    
        return r0;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.storm.localizer.LocalizedResource downloadBlob(java.util.Map r10, java.lang.String r11, java.io.File r12, java.lang.String r13, boolean r14, boolean r15) throws org.apache.storm.generated.AuthorizationException, org.apache.storm.generated.KeyNotFoundException, java.io.IOException {
        /*
            Method dump skipped, instructions count: 927
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.storm.localizer.Localizer.downloadBlob(java.util.Map, java.lang.String, java.io.File, java.lang.String, boolean, boolean):org.apache.storm.localizer.LocalizedResource");
    }

    public void setBlobPermissions(Map map, String str, String str2) throws IOException {
        if (Utils.getBoolean(map.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
            String string = Utils.getString(map.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
            if (string.isEmpty()) {
                string = System.getProperty("storm.home") + "/bin/worker-launcher";
            }
            ArrayList arrayList = new ArrayList(Arrays.asList(string, str, "blob", str2));
            String[] strArr = (String[]) arrayList.toArray(new String[arrayList.size()]);
            ShellUtils.ShellCommandExecutor shellCommandExecutor = new ShellUtils.ShellCommandExecutor(strArr);
            LOG.info("Setting blob permissions, command: {}", Arrays.toString(strArr));
            try {
                shellCommandExecutor.execute();
                LOG.debug("output: {}", shellCommandExecutor.getOutput());
            } catch (ShellUtils.ExitCodeException e) {
                int exitCode = shellCommandExecutor.getExitCode();
                LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
                LOG.debug("output: {}", shellCommandExecutor.getOutput());
                throw new IOException("Setting blob permissions failed (exitCode=" + exitCode + ") with output: " + shellCommandExecutor.getOutput(), e);
            }
        }
    }

    public synchronized void handleCacheCleanup() {
        LocalizedResourceRetentionSet localizedResourceRetentionSet = new LocalizedResourceRetentionSet(this._cacheTargetSize);
        for (LocalizedResourceSet localizedResourceSet : this._userRsrc.values()) {
            localizedResourceRetentionSet.addResources(localizedResourceSet);
            LOG.debug("Resources to be cleaned after adding {} : {}", localizedResourceSet.getUser(), localizedResourceRetentionSet);
        }
        localizedResourceRetentionSet.cleanup();
        LOG.debug("Resource cleanup: {}", localizedResourceRetentionSet);
        for (LocalizedResourceSet localizedResourceSet2 : this._userRsrc.values()) {
            if (localizedResourceSet2.getSize() == 0) {
                String user = localizedResourceSet2.getUser();
                LOG.debug("removing empty set: {}", user);
                File localUserFileCacheDir = getLocalUserFileCacheDir(user);
                getCacheDirForFiles(localUserFileCacheDir).delete();
                getCacheDirForArchives(localUserFileCacheDir).delete();
                getLocalUserFileCacheDir(user).delete();
                if (getLocalUserDir(user).delete()) {
                    this._userRsrc.remove(user);
                }
            }
        }
    }
}
