package org.apache.hyracks.control.cc;

import java.io.File;
import java.io.FileReader;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.cluster.NodeManager;
import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.cc.scheduler.ResourceManager;
import org.apache.hyracks.control.cc.web.WebServer;
import org.apache.hyracks.control.cc.work.GatherStateDumpsWork;
import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
import org.apache.hyracks.control.cc.work.TriggerNCWork;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.logs.LogFile;
import org.apache.hyracks.control.common.shutdown.ShutdownRun;
import org.apache.hyracks.control.common.work.WorkQueue;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
import org.xml.sax.InputSource;

/* loaded from: input_file:org/apache/hyracks/control/cc/ClusterControllerService.class */
public class ClusterControllerService implements IControllerService {
    private static final Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
    private final CCConfig ccConfig;
    private final ConfigManager configManager;
    private IPCSystem clusterIPC;
    private IPCSystem clientIPC;
    private final LogFile jobLog;
    private ServerContext serverCtx;
    private WebServer webServer;
    private ClusterControllerInfo info;
    private CCServiceContext serviceCtx;
    private final PreDistributedJobStore preDistributedJobStore;
    private final WorkQueue workQueue;
    private ExecutorService executor;
    private final Timer timer;
    private final ICCContext ccContext;
    private final DeadNodeSweeper sweeper;
    private final IDatasetDirectoryService datasetDirectoryService;
    private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
    private final Map<String, GatherStateDumpsWork.StateDumpRun> stateDumpRunMap;
    private final Map<String, GetThreadDumpWork.ThreadDumpRun> threadDumpRunMap;
    private final INodeManager nodeManager;
    private final IResourceManager resourceManager;
    private final ICCApplication application;
    private final JobIdFactory jobIdFactory;
    private IJobManager jobManager;
    private ShutdownRun shutdownCallback;

    /* loaded from: input_file:org/apache/hyracks/control/cc/ClusterControllerService$ClusterControllerContext.class */
    private final class ClusterControllerContext implements ICCContext {
        private final ClusterTopology topology;

        private ClusterControllerContext(ClusterTopology clusterTopology) {
            this.topology = clusterTopology;
        }

        public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
            try {
                ClusterControllerService.this.workQueue.scheduleAndSync(new GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(), map));
            } catch (Exception e) {
                throw new HyracksDataException(e);
            }
        }

        public ClusterControllerInfo getClusterControllerInfo() {
            return ClusterControllerService.this.info;
        }

        public ClusterTopology getClusterTopology() {
            return this.topology;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hyracks/control/cc/ClusterControllerService$DeadNodeSweeper.class */
    public class DeadNodeSweeper extends TimerTask {
        private DeadNodeSweeper() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ClusterControllerService.this.workQueue.schedule(new RemoveDeadNodesWork(ClusterControllerService.this));
        }
    }

    public ClusterControllerService(CCConfig cCConfig) throws Exception {
        this(cCConfig, getApplication(cCConfig));
    }

    public ClusterControllerService(CCConfig cCConfig, ICCApplication iCCApplication) throws Exception {
        this.preDistributedJobStore = new PreDistributedJobStore();
        this.resourceManager = new ResourceManager();
        this.ccConfig = cCConfig;
        this.configManager = this.ccConfig.getConfigManager();
        if (iCCApplication == null) {
            throw new IllegalArgumentException("ICCApplication cannot be null");
        }
        this.application = iCCApplication;
        this.configManager.processConfig();
        this.jobLog = new LogFile(new File(this.ccConfig.getRootDir(), "logs/jobs"));
        this.workQueue = new WorkQueue("ClusterController", 10);
        this.timer = new Timer(true);
        this.ccContext = new ClusterControllerContext(computeClusterTopology(this.ccConfig));
        this.sweeper = new DeadNodeSweeper();
        this.datasetDirectoryService = new DatasetDirectoryService(this.ccConfig.getResultTTL(), this.ccConfig.getResultSweepThreshold(), this.preDistributedJobStore);
        this.deploymentRunMap = new HashMap();
        this.stateDumpRunMap = new HashMap();
        this.threadDumpRunMap = Collections.synchronizedMap(new HashMap());
        this.nodeManager = new NodeManager(this, this.ccConfig, this.resourceManager);
        this.jobIdFactory = new JobIdFactory();
    }

    private static ClusterTopology computeClusterTopology(CCConfig cCConfig) throws Exception {
        if (cCConfig.getClusterTopology() == null) {
            return null;
        }
        FileReader fileReader = new FileReader(cCConfig.getClusterTopology());
        try {
            ClusterTopology parse = TopologyDefinitionParser.parse(new InputSource(fileReader));
            fileReader.close();
            return parse;
        } catch (Throwable th) {
            fileReader.close();
            throw th;
        }
    }

    public void start() throws Exception {
        LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
        this.serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(this.ccConfig.getRootDir()));
        this.clusterIPC = new IPCSystem(new InetSocketAddress(this.ccConfig.getClusterListenPort()), new ClusterControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
        this.clientIPC = new IPCSystem(new InetSocketAddress(this.ccConfig.getClientListenAddress(), this.ccConfig.getClientListenPort()), new ClientInterfaceIPCI(this, this.jobIdFactory), new JavaSerializationBasedPayloadSerializerDeserializer());
        this.webServer = new WebServer(this, this.ccConfig.getConsoleListenPort());
        this.clusterIPC.start();
        this.clientIPC.start();
        this.webServer.start();
        this.info = new ClusterControllerInfo(this.ccConfig.getClientListenAddress(), this.ccConfig.getClientListenPort(), this.webServer.getListeningPort());
        this.timer.schedule(this.sweeper, 0L, this.ccConfig.getHeartbeatPeriod());
        this.jobLog.open();
        startApplication();
        this.datasetDirectoryService.init(this.executor);
        this.workQueue.start();
        connectNCs();
        LOGGER.log(Level.INFO, "Started ClusterControllerService");
        notifyApplication();
    }

    private void startApplication() throws Exception {
        this.serviceCtx = new CCServiceContext(this, this.serverCtx, this.ccContext, this.ccConfig.getAppConfig());
        this.serviceCtx.addJobLifecycleListener(this.datasetDirectoryService);
        this.executor = Executors.newCachedThreadPool(this.serviceCtx.getThreadFactory());
        this.application.start(this.serviceCtx, this.ccConfig.getAppArgsArray());
        IJobCapacityController jobCapacityController = this.application.getJobCapacityController();
        try {
            this.jobManager = (IJobManager) getClass().getClassLoader().loadClass(this.ccConfig.getJobManagerClass()).getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class).newInstance(this.ccConfig, this, jobCapacityController);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.log(Level.WARNING, "class " + this.ccConfig.getJobManagerClass() + " could not be used: ", e);
            }
            this.jobManager = new JobManager(this.ccConfig, this, jobCapacityController);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Pair<String, Integer> getNCService(String str) {
        IApplicationConfig nodeEffectiveConfig = this.configManager.getNodeEffectiveConfig(str);
        return Pair.of(nodeEffectiveConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), Integer.valueOf(nodeEffectiveConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
    }

    private Map<String, Pair<String, Integer>> getNCServices() {
        TreeMap treeMap = new TreeMap();
        for (String str : this.configManager.getNodeNames()) {
            Pair<String, Integer> nCService = getNCService(str);
            if (((Integer) nCService.getRight()).intValue() != -1) {
                treeMap.put(str, nCService);
            }
        }
        return treeMap;
    }

    private void connectNCs() {
        getNCServices().entrySet().forEach(entry -> {
            this.executor.submit((Runnable) new TriggerNCWork(this, (String) ((Pair) entry.getValue()).getLeft(), ((Integer) ((Pair) entry.getValue()).getRight()).intValue(), (String) entry.getKey()));
        });
        this.serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() { // from class: org.apache.hyracks.control.cc.ClusterControllerService.1
            public void notifyNodeJoin(String str, Map<IOption, Object> map) throws HyracksException {
                ClusterControllerService.LOGGER.log(Level.WARNING, "Getting notified that node: " + str + " has joined. and we don't care");
            }

            public void notifyNodeFailure(Collection<String> collection) throws HyracksException {
                ClusterControllerService.LOGGER.log(Level.WARNING, "Getting notified that nodes: " + collection + " has failed");
                for (String str : collection) {
                    Pair nCService = ClusterControllerService.this.getNCService(str);
                    if (((Integer) nCService.getRight()).intValue() != -1) {
                        ClusterControllerService.this.executor.submit((Runnable) new TriggerNCWork(ClusterControllerService.this, (String) nCService.getLeft(), ((Integer) nCService.getRight()).intValue(), str));
                    }
                }
            }
        });
    }

    private void terminateNCServices() throws Exception {
        ArrayList arrayList = new ArrayList();
        getNCServices().entrySet().forEach(entry -> {
            if (((Integer) ((Pair) entry.getValue()).getRight()).intValue() != -1) {
                ShutdownNCServiceWork shutdownNCServiceWork = new ShutdownNCServiceWork((String) ((Pair) entry.getValue()).getLeft(), ((Integer) ((Pair) entry.getValue()).getRight()).intValue(), (String) entry.getKey());
                this.workQueue.schedule(shutdownNCServiceWork);
                arrayList.add(shutdownNCServiceWork);
            }
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((ShutdownNCServiceWork) it.next()).sync();
        }
    }

    private void notifyApplication() throws Exception {
        this.application.startupCompleted();
    }

    public void stop(boolean z) throws Exception {
        if (z) {
            terminateNCServices();
        }
        stop();
    }

    public void stop() throws Exception {
        LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
        stopApplication();
        this.webServer.stop();
        this.sweeper.cancel();
        this.workQueue.stop();
        this.executor.shutdownNow();
        this.clusterIPC.stop();
        this.jobLog.close();
        this.clientIPC.stop();
        LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
    }

    private void stopApplication() throws Exception {
        this.application.stop();
    }

    public ServerContext getServerContext() {
        return this.serverCtx;
    }

    public ICCContext getCCContext() {
        return this.ccContext;
    }

    public IJobManager getJobManager() {
        return this.jobManager;
    }

    public INodeManager getNodeManager() {
        return this.nodeManager;
    }

    public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException {
        return this.preDistributedJobStore;
    }

    public IResourceManager getResourceManager() {
        return this.resourceManager;
    }

    public LogFile getJobLogFile() {
        return this.jobLog;
    }

    public WorkQueue getWorkQueue() {
        return this.workQueue;
    }

    public ExecutorService getExecutor() {
        return this.executor;
    }

    public CCConfig getConfig() {
        return this.ccConfig;
    }

    /* renamed from: getContext, reason: merged with bridge method [inline-methods] */
    public CCServiceContext m7getContext() {
        return this.serviceCtx;
    }

    public ClusterControllerInfo getClusterControllerInfo() {
        return this.info;
    }

    public CCConfig getCCConfig() {
        return this.ccConfig;
    }

    public IPCSystem getClusterIPC() {
        return this.clusterIPC;
    }

    public NetworkAddress getDatasetDirectoryServiceInfo() {
        return new NetworkAddress(this.ccConfig.getClientListenAddress(), this.ccConfig.getClientListenPort());
    }

    public JobIdFactory getJobIdFactory() {
        return this.jobIdFactory;
    }

    public IDatasetDirectoryService getDatasetDirectoryService() {
        return this.datasetDirectoryService;
    }

    public synchronized void addStateDumpRun(String str, GatherStateDumpsWork.StateDumpRun stateDumpRun) {
        this.stateDumpRunMap.put(str, stateDumpRun);
    }

    public synchronized GatherStateDumpsWork.StateDumpRun getStateDumpRun(String str) {
        return this.stateDumpRunMap.get(str);
    }

    public synchronized void removeStateDumpRun(String str) {
        this.stateDumpRunMap.remove(str);
    }

    public synchronized void addDeploymentRun(DeploymentId deploymentId, DeploymentRun deploymentRun) {
        this.deploymentRunMap.put(deploymentId, deploymentRun);
    }

    public synchronized DeploymentRun getDeploymentRun(DeploymentId deploymentId) {
        return this.deploymentRunMap.get(deploymentId);
    }

    public synchronized void removeDeploymentRun(DeploymentId deploymentId) {
        this.deploymentRunMap.remove(deploymentId);
    }

    public synchronized void setShutdownRun(ShutdownRun shutdownRun) {
        this.shutdownCallback = shutdownRun;
    }

    public synchronized ShutdownRun getShutdownRun() {
        return this.shutdownCallback;
    }

    public void addThreadDumpRun(String str, GetThreadDumpWork.ThreadDumpRun threadDumpRun) {
        this.threadDumpRunMap.put(str, threadDumpRun);
    }

    public GetThreadDumpWork.ThreadDumpRun removeThreadDumpRun(String str) {
        return this.threadDumpRunMap.remove(str);
    }

    private static ICCApplication getApplication(CCConfig cCConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        return cCConfig.getAppClass() != null ? (ICCApplication) Class.forName(cCConfig.getAppClass()).newInstance() : BaseCCApplication.INSTANCE;
    }

    public ICCApplication getApplication() {
        return this.application;
    }

    public Object getApplicationContext() {
        return this.application.getApplicationContext();
    }
}
