package org.apache.linkis.engineplugin.cache.refresh;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.linkis.engineplugin.cache.config.EngineConnPluginCacheConfig;
import org.apache.linkis.engineplugin.cache.refresh.RefreshableEngineConnPluginCache;
import org.apache.linkis.manager.engineplugin.common.loader.entity.EngineConnPluginInfo;
import org.apache.linkis.manager.engineplugin.common.loader.entity.EngineConnPluginInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineplugin/cache/refresh/DefaultRefreshPluginCacheContainer.class */
public class DefaultRefreshPluginCacheContainer implements RefreshPluginCacheContainer {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultRefreshPluginCacheContainer.class);
    private static final String REFRESH_SCHEDULE_THREAD = "Refresh-plugin-cache-scheduler-";
    private static final String REFRESH_WORKER_THREAD = "Refresh-plugin-cache-worker-";
    private PluginCacheRefresher refresher;
    private ExecutorService scheduleService;
    private ExecutorService workService;
    private RefreshableEngineConnPluginCache pluginCache;
    private ConcurrentHashMap<String, RefreshPluginCacheOperation> pluginRefreshOps = new ConcurrentHashMap<>();
    private List<RefreshableEngineConnPluginCache.RefreshListener> refreshListeners = new ArrayList();
    private DelayQueue<RefreshPluginCacheOperation> refreshDelayQueue = new DelayQueue<>();
    private volatile boolean isRunning = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/linkis/engineplugin/cache/refresh/DefaultRefreshPluginCacheContainer$RefreshThreadFactory.class */
    public static class RefreshThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        RefreshThreadFactory(String str) {
            SecurityManager securityManager = System.getSecurityManager();
            this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = str + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        }
    }

    public DefaultRefreshPluginCacheContainer(RefreshableEngineConnPluginCache refreshableEngineConnPluginCache) {
        this.pluginCache = refreshableEngineConnPluginCache;
    }

    @Override // org.apache.linkis.engineplugin.cache.refresh.RefreshPluginCacheContainer
    public synchronized void start(PluginCacheRefresher pluginCacheRefresher) {
        if (this.isRunning) {
            LOG.trace("This container has been started");
            return;
        }
        LOG.info("Starting container of refreshing plugin cache...");
        checkRefresher(pluginCacheRefresher);
        this.refresher = pluginCacheRefresher;
        startExecutors();
        this.isRunning = true;
        startConsumer();
    }

    @Override // org.apache.linkis.engineplugin.cache.refresh.RefreshPluginCacheContainer
    public synchronized void stop() {
        LOG.info("Stopping container of refreshing plugin cache...");
        stopExecutors();
        this.isRunning = false;
        LOG.info("Success to stop container of refreshing plugin cache");
    }

    @Override // org.apache.linkis.engineplugin.cache.refresh.RefreshPluginCacheContainer
    public void addRefreshOperation(EngineConnPluginInfo engineConnPluginInfo, RefreshPluginCacheOperation refreshPluginCacheOperation) {
        if (null != this.refresher) {
            refreshPluginCacheOperation.setTimeUnit(this.refresher.timeUnit());
            refreshPluginCacheOperation.setDuration(this.refresher.interval());
            refreshPluginCacheOperation.setPluginInfo(engineConnPluginInfo);
            refreshPluginCacheOperation.nextTime();
            this.pluginRefreshOps.computeIfAbsent(engineConnPluginInfo.toString(), str -> {
                this.refreshDelayQueue.put((DelayQueue<RefreshPluginCacheOperation>) refreshPluginCacheOperation);
                return refreshPluginCacheOperation;
            });
        }
    }

    @Override // org.apache.linkis.engineplugin.cache.refresh.RefreshPluginCacheContainer
    public void removeRefreshOperation(EngineConnPluginInfo engineConnPluginInfo) {
        RefreshPluginCacheOperation remove = this.pluginRefreshOps.remove(engineConnPluginInfo.toString());
        if (null != remove) {
            LOG.trace("Remove refresh-cache operation in queue for plugin:[ " + engineConnPluginInfo + " ]");
            this.refreshDelayQueue.remove(remove);
        }
    }

    @Override // org.apache.linkis.engineplugin.cache.refresh.RefreshPluginCacheContainer
    public void addRefreshListener(RefreshableEngineConnPluginCache.RefreshListener refreshListener) {
        this.refreshListeners.add(refreshListener);
    }

    private void startExecutors() {
        LOG.info("Start executors: [ schedule, worker ] in container");
        this.scheduleService = Executors.newSingleThreadExecutor(new RefreshThreadFactory(REFRESH_SCHEDULE_THREAD));
        this.workService = Executors.newFixedThreadPool(((Integer) EngineConnPluginCacheConfig.PLUGIN_CACHE_REFRESH_WORKERS.getValue()).intValue(), new RefreshThreadFactory(REFRESH_WORKER_THREAD));
    }

    private void stopExecutors() {
        LOG.info("Stop executors: [ schedule, worker ] in container");
        if (null != this.scheduleService) {
            this.scheduleService.shutdownNow();
            this.scheduleService = null;
        }
        if (null != this.workService) {
            this.workService.shutdownNow();
            this.workService = null;
        }
    }

    private void onRefresh(EngineConnPluginInfo engineConnPluginInfo) {
        this.refreshListeners.forEach(refreshListener -> {
            refreshListener.onRefresh(engineConnPluginInfo);
        });
    }

    private void checkRefresher(PluginCacheRefresher pluginCacheRefresher) {
        long interval = pluginCacheRefresher.interval();
        if (interval <= 0) {
            throw new IllegalArgumentException("interval value [" + interval + "]of refresher cannot be <= 0 ");
        }
    }

    private void startConsumer() {
        this.scheduleService.submit(() -> {
            while (this.isRunning) {
                try {
                    RefreshPluginCacheOperation take = this.refreshDelayQueue.take();
                    if (take == this.pluginRefreshOps.get(take.cacheStringKey())) {
                        try {
                            try {
                                EngineConnPluginInstance engineConnPluginInstance = (EngineConnPluginInstance) this.workService.submit(() -> {
                                    return take.getOperation().apply(take.pluginInfo());
                                }).get(12L, TimeUnit.HOURS);
                                if (null != engineConnPluginInstance) {
                                    this.pluginCache.refresh(engineConnPluginInstance.info(), engineConnPluginInstance);
                                    take.setPluginInfo(engineConnPluginInstance.info());
                                    onRefresh(engineConnPluginInstance.info());
                                }
                            } catch (Throwable th) {
                                take.nextTime();
                                take.setDuration(this.refresher.interval());
                                take.setTimeUnit(this.refresher.timeUnit());
                                this.refreshDelayQueue.add((DelayQueue<RefreshPluginCacheOperation>) take);
                                throw th;
                                break;
                            }
                        } catch (Exception e) {
                            LOG.info("Unable to refresh plugin: [ " + this.pluginCache.toString() + " ]", e);
                        }
                        take.nextTime();
                        take.setDuration(this.refresher.interval());
                        take.setTimeUnit(this.refresher.timeUnit());
                        this.refreshDelayQueue.add((DelayQueue<RefreshPluginCacheOperation>) take);
                    }
                } catch (InterruptedException e2) {
                    LOG.info("Error in consuming delay queue of refresh-plugin-cache operation, message:[" + e2.getMessage() + "]", e2);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                    }
                }
            }
        });
    }
}
