package org.apache.samza.autoscaling.deployer;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.samza.autoscaling.utils.YarnUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.job.JobRunner;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.SystemStreamPartitionIterator;
import org.apache.samza.util.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/autoscaling/deployer/ConfigManager.class */
public class ConfigManager {
    private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
    private SystemStreamPartitionIterator coordinatorStreamIterator;
    private static final Logger log = LoggerFactory.getLogger(ConfigManager.class);
    private final long interval;
    private final String jobName;
    private final int jobID;
    private Config config;
    private YarnUtil yarnUtil;
    private static final String SERVER_URL_OPT = "samza.autoscaling.server.url";
    private static final String YARN_CONTAINER_COUNT_OPT = "yarn.container.count";
    private final long defaultPollingInterval = 100;
    private final int defaultReadJobModelDelayMs = 100;
    private String coordinatorServerURL = null;
    private final String rmAddressOpt = "yarn.rm.address";
    private final String rmPortOpt = "yarn.rm.port";
    private final String pollingIntervalOpt = "configManager.polling.interval";

    public ConfigManager(Config config) {
        if (!config.containsKey("yarn.rm.address") || !config.containsKey("yarn.rm.port")) {
            throw new IllegalArgumentException("Missing config: the config file does not contain the rm host or port.");
        }
        String str = (String) config.get("yarn.rm.address");
        int i = config.getInt("yarn.rm.port");
        if (!config.containsKey(JobConfig.JOB_NAME())) {
            throw new IllegalArgumentException("Missing config: the config does not contain the job name");
        }
        this.jobName = (String) config.get(JobConfig.JOB_NAME());
        this.jobID = config.getInt(JobConfig.JOB_ID(), 1);
        if (config.containsKey("configManager.polling.interval")) {
            long j = config.getLong("configManager.polling.interval");
            if (j <= 0) {
                throw new IllegalArgumentException("polling interval cannot be a negative value");
            }
            this.interval = j;
        } else {
            this.interval = 100L;
        }
        this.config = config;
        this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
        this.yarnUtil = new YarnUtil(str, i);
    }

    public void run() {
        start();
        while (true) {
            try {
                try {
                    Thread.sleep(this.interval);
                    processConfigMessages();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.warn("Got interrupt in config manager thread, so shutting down");
                    Thread.currentThread().interrupt();
                    log.info("Stopping the config manager");
                    stop();
                    return;
                }
            } catch (Throwable th) {
                log.info("Stopping the config manager");
                stop();
                throw th;
            }
        }
    }

    public void start() {
        register();
        this.coordinatorStreamConsumer.start();
        this.coordinatorStreamIterator = this.coordinatorStreamConsumer.getStartIterator();
        bootstrap();
    }

    public void stop() {
        this.coordinatorStreamConsumer.stop();
        this.coordinatorServerURL = null;
        this.yarnUtil.stop();
    }

    private void register() {
        this.coordinatorStreamConsumer.register();
    }

    private void bootstrap() {
        LinkedList linkedList = new LinkedList();
        linkedList.add(SERVER_URL_OPT);
        processConfigMessages(linkedList);
        if (this.coordinatorServerURL == null) {
            throw new IllegalStateException("coordinator server url is null, while the bootstrap has finished ");
        }
        log.info("Config manager bootstrapped");
    }

    private void skipUnreadMessages() {
        processConfigMessages(new LinkedList());
        log.info("Config manager skipped messages");
    }

    public void processConfigMessages() {
        LinkedList linkedList = new LinkedList();
        linkedList.add(YARN_CONTAINER_COUNT_OPT);
        linkedList.add(SERVER_URL_OPT);
        processConfigMessages(linkedList);
    }

    private void processConfigMessages(List<String> list) {
        if (this.coordinatorStreamConsumer.hasNewMessages(this.coordinatorStreamIterator)) {
            if (list == null) {
                throw new IllegalArgumentException("The keys to process list is null");
            }
            Iterator it = this.coordinatorStreamConsumer.getUnreadMessages(this.coordinatorStreamIterator, "set-config").iterator();
            while (it.hasNext()) {
                String str = null;
                try {
                    SetConfig setConfig = new SetConfig((CoordinatorStreamMessage) it.next());
                    str = setConfig.getKey();
                    Map map = (Map) setConfig.getMessageMap().get("values");
                    String str2 = null;
                    if (map != null) {
                        str2 = (String) map.get("value");
                    }
                    log.debug("Received set-config message with key: " + str + " and value: " + str2);
                    if (list.contains(str)) {
                        if (str.equals(YARN_CONTAINER_COUNT_OPT)) {
                            handleYarnContainerChange(str2);
                        } else if (str.equals(SERVER_URL_OPT)) {
                            handleServerURLChange(str2);
                        } else {
                            log.info("Setting the " + str + " configuration is currently not supported, skipping the message");
                        }
                    }
                } catch (Exception e) {
                    log.debug("Error in reading a message, skipping message with key " + str);
                }
            }
        }
    }

    private void handleServerURLChange(String str) {
        this.coordinatorServerURL = str;
        log.info("Server URL being set to " + str);
    }

    private void handleYarnContainerChange(String str) throws IOException, YarnException {
        String runningAppId = this.yarnUtil.getRunningAppId(this.jobName, this.jobID);
        int intValue = Integer.valueOf(str).intValue();
        int currentNumTasks = getCurrentNumTasks();
        if (intValue == getCurrentNumContainers()) {
            log.error("The new number of containers is equal to the current number of containers, skipping this message");
            return;
        }
        if (intValue <= 0) {
            log.error("The number of containers cannot be zero or less, skipping this message");
            return;
        }
        if (intValue > currentNumTasks) {
            log.error("The number of containers cannot be more than the number of task, skipping this message");
            return;
        }
        log.info("Killing the current job");
        this.yarnUtil.killApplication(runningAppId);
        this.coordinatorServerURL = null;
        try {
            String applicationState = this.yarnUtil.getApplicationState(runningAppId);
            Thread.sleep(1000L);
            int i = 1;
            while (!applicationState.equals("KILLED")) {
                applicationState = this.yarnUtil.getApplicationState(runningAppId);
                log.info("Job kill signal sent, but job not killed yet for " + runningAppId + ". Sleeping for another 1000ms");
                Thread.sleep(1000L);
                i++;
                if (i > 10) {
                    throw new IllegalStateException("Job has not been killed after 10 attempts.");
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        log.info("Killed the current job successfully");
        log.info("Staring the job again");
        skipUnreadMessages();
        new JobRunner(this.config).run(false);
    }

    public int getCurrentNumTasks() {
        int i = 0;
        Iterator it = SamzaContainer.readJobModel(this.coordinatorServerURL, 100).getContainers().values().iterator();
        while (it.hasNext()) {
            i += ((ContainerModel) it.next()).getTasks().size();
        }
        return i;
    }

    public int getCurrentNumContainers() {
        return SamzaContainer.readJobModel(this.coordinatorServerURL, 100).getContainers().values().size();
    }

    public String getCoordinatorServerURL() {
        return this.coordinatorServerURL;
    }

    public static void main(String[] strArr) {
        CommandLine commandLine = new CommandLine();
        new ConfigManager(commandLine.loadConfig(commandLine.parser().parse(strArr))).run();
    }
}
