package azkaban.executor;

import azkaban.event.EventHandler;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.FlowUtils;
import azkaban.flow.SpecialJobTypes;
import azkaban.jobcallback.JobCallbackConstants;
import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import java.io.IOException;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:azkaban/executor/ExecutionController.class */
public class ExecutionController extends EventHandler implements ExecutorManagerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(ExecutionController.class);
    private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
    private final ExecutorLoader executorLoader;
    private final ExecutorApiGateway apiGateway;
    private final AlerterHolder alerterHolder;
    private final ExecutorHealthChecker executorHealthChecker;
    private final int maxConcurrentRunsOneFlow;
    private final Map<Pair<String, String>, Integer> maxConcurrentRunsPerFlowMap;
    private final CommonMetrics commonMetrics;
    private final Props azkProps;

    @Inject
    ExecutionController(Props props, ExecutorLoader executorLoader, CommonMetrics commonMetrics, ExecutorApiGateway executorApiGateway, AlerterHolder alerterHolder, ExecutorHealthChecker executorHealthChecker) {
        this.azkProps = props;
        this.executorLoader = executorLoader;
        this.commonMetrics = commonMetrics;
        this.apiGateway = executorApiGateway;
        this.alerterHolder = alerterHolder;
        this.executorHealthChecker = executorHealthChecker;
        this.maxConcurrentRunsOneFlow = ExecutorUtils.getMaxConcurrentRunsOneFlow(props);
        this.maxConcurrentRunsPerFlowMap = ExecutorUtils.getMaxConcurentRunsPerFlowMap(props);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void setupExecutors() throws ExecutorManagerException {
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void disableQueueProcessorThread() {
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void enableQueueProcessorThread() {
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Thread.State getExecutorManagerThreadState() {
        return Thread.State.RUNNABLE;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public boolean isExecutorManagerThreadActive() {
        return true;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public long getLastExecutorManagerThreadCheckTime() {
        return 1L;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Collection<Executor> getAllActiveExecutors() {
        Collection arrayList = new ArrayList();
        try {
            arrayList = this.executorLoader.fetchActiveExecutors();
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get all active executors.", e);
        }
        return arrayList;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Executor fetchExecutor(int i) throws ExecutorManagerException {
        return this.executorLoader.fetchExecutor(i);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Set<String> getPrimaryServerHosts() {
        HashSet hashSet = new HashSet();
        try {
            for (Executor executor : this.executorLoader.fetchActiveExecutors()) {
                hashSet.add(executor.getHost() + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + executor.getPort());
            }
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get primary server hosts.", e);
        }
        return hashSet;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Set<String> getAllActiveExecutorServerHosts() {
        Set<String> primaryServerHosts = getPrimaryServerHosts();
        try {
            Iterator<Pair<ExecutionReference, ExecutableFlow>> it = this.executorLoader.fetchActiveFlows().values().iterator();
            while (it.hasNext()) {
                ExecutionReference first = it.next().getFirst();
                if (first.getExecutor().isPresent()) {
                    Executor executor = first.getExecutor().get();
                    primaryServerHosts.add(executor.getHost() + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + executor.getPort());
                }
            }
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get all active executor server hosts.", e);
        }
        return primaryServerHosts;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<Integer> getRunningFlows(int i, String str) {
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.addAll(getRunningFlowsHelper(i, str, this.executorLoader.fetchUnfinishedFlows().values()));
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get running flows for project " + i + ", flow " + str, e);
        }
        return arrayList;
    }

    private List<Integer> getRunningFlowsHelper(int i, String str, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        ArrayList arrayList = new ArrayList();
        for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
            if (pair.getSecond().getFlowId().equals(str) && pair.getSecond().getProjectId() == i) {
                arrayList.add(Integer.valueOf(pair.getFirst().getExecId()));
            }
        }
        return arrayList;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor() throws IOException {
        ArrayList arrayList = new ArrayList();
        try {
            getActiveFlowsWithExecutorHelper(arrayList, this.executorLoader.fetchUnfinishedFlows().values());
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get active flows with executor.", e);
        }
        return arrayList;
    }

    private void getActiveFlowsWithExecutorHelper(List<Pair<ExecutableFlow, Optional<Executor>>> list, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
            list.add(new Pair<>(pair.getSecond(), pair.getFirst().getExecutor()));
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public boolean isFlowRunning(int i, String str) {
        boolean z = false;
        try {
            z = isFlowRunningHelper(i, str, this.executorLoader.fetchUnfinishedFlows().values());
        } catch (ExecutorManagerException e) {
            logger.error("Failed to check if the flow is running for project " + i + ", flow " + str, e);
        }
        return z;
    }

    private boolean isFlowRunningHelper(int i, String str, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
            if (pair.getSecond().getProjectId() == i && pair.getSecond().getFlowId().equals(str)) {
                return true;
            }
        }
        return false;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public ExecutableFlow getExecutableFlow(int i) throws ExecutorManagerException {
        return this.executorLoader.fetchExecutableFlow(i);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getRunningFlows() {
        ArrayList<ExecutableFlow> arrayList = new ArrayList<>();
        try {
            getFlowsHelper(arrayList, this.executorLoader.fetchUnfinishedFlows().values());
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get running flows.", e);
        }
        return arrayList;
    }

    private void getFlowsHelper(ArrayList<ExecutableFlow> arrayList, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        collection.stream().forEach(pair -> {
            arrayList.add(pair.getSecond());
        });
    }

    public List<Integer> getRunningFlowIds() {
        ArrayList arrayList = new ArrayList();
        try {
            getExecutionIdsHelper(arrayList, this.executorLoader.fetchUnfinishedFlows().values());
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get running flow ids.", e);
        }
        return arrayList;
    }

    public List<Integer> getQueuedFlowIds() {
        ArrayList arrayList = new ArrayList();
        try {
            getExecutionIdsHelper(arrayList, this.executorLoader.fetchQueuedFlows());
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get queued flow ids.", e);
        }
        return arrayList;
    }

    private void getExecutionIdsHelper(List<Integer> list, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        collection.stream().forEach(pair -> {
            list.add(Integer.valueOf(((ExecutableFlow) pair.getSecond()).getExecutionId()));
        });
        Collections.sort(list);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public long getQueuedFlowSize() {
        long j = 0;
        try {
            j = this.executorLoader.fetchQueuedFlows().size();
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get queued flow size.", e);
        }
        return j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getRecentlyFinishedFlows() {
        List arrayList = new ArrayList();
        try {
            arrayList = this.executorLoader.fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
        } catch (ExecutorManagerException e) {
            logger.error("Failed to fetch recently finished flows.", e);
        }
        return arrayList;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(String str, int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(null, '%' + str + '%', null, 0, -1L, -1L, i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(String str, String str2, String str3, int i, long j, long j2, int i2, int i3) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(str, str2, str3, i, j, j2, i2, i3);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableJobInfo> getExecutableJobs(Project project, String str, int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchJobHistory(project.getId(), str, i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public int getNumberOfJobExecutions(Project project, String str) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableNodes(project.getId(), str);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public FileIOUtils.LogData getExecutableFlowLog(ExecutableFlow executableFlow, int i, int i2) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId = this.executorLoader.fetchActiveFlowByExecId(executableFlow.getExecutionId());
        if (fetchActiveFlowByExecId == null) {
            return this.executorLoader.fetchLogs(executableFlow.getExecutionId(), "", 0, i, i2);
        }
        return FileIOUtils.LogData.createLogDataFromObject(this.apiGateway.callWithReference(fetchActiveFlowByExecId.getFirst(), ConnectorParams.LOG_ACTION, new Pair<>("type", SpecialJobTypes.EMBEDDED_FLOW_TYPE), new Pair<>("offset", String.valueOf(i)), new Pair<>("length", String.valueOf(i2))));
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public FileIOUtils.LogData getExecutionJobLog(ExecutableFlow executableFlow, String str, int i, int i2, int i3) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId = this.executorLoader.fetchActiveFlowByExecId(executableFlow.getExecutionId());
        if (fetchActiveFlowByExecId == null) {
            return this.executorLoader.fetchLogs(executableFlow.getExecutionId(), str, i3, i, i2);
        }
        return FileIOUtils.LogData.createLogDataFromObject(this.apiGateway.callWithReference(fetchActiveFlowByExecId.getFirst(), ConnectorParams.LOG_ACTION, new Pair<>("type", "job"), new Pair<>(ConnectorParams.UPDATE_MAP_JOBID, str), new Pair<>("offset", String.valueOf(i)), new Pair<>("length", String.valueOf(i2)), new Pair<>("attempt", String.valueOf(i3))));
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<Object> getExecutionJobStats(ExecutableFlow executableFlow, String str, int i) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId = this.executorLoader.fetchActiveFlowByExecId(executableFlow.getExecutionId());
        if (fetchActiveFlowByExecId == null) {
            return this.executorLoader.fetchAttachments(executableFlow.getExecutionId(), str, i);
        }
        return (List) this.apiGateway.callWithReference(fetchActiveFlowByExecId.getFirst(), ConnectorParams.ATTACHMENTS_ACTION, new Pair<>(ConnectorParams.UPDATE_MAP_JOBID, str), new Pair<>("attempt", String.valueOf(i))).get(ConnectorParams.ATTACHMENTS_ACTION);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Map<String, String> getExternalJobLogUrls(ExecutableFlow executableFlow, String str, int i) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (!this.azkProps.containsKey("azkaban.server.external.resource_manager_job_url") || !this.azkProps.containsKey("azkaban.server.external.history_server_job_url") || !this.azkProps.containsKey("azkaban.server.external.spark_history_server_job_url")) {
            return linkedHashMap;
        }
        for (String str2 : getApplicationIds(executableFlow, str, i)) {
            String createJobLinkUrl = ExecutionControllerUtils.createJobLinkUrl(executableFlow, str, str2, this.azkProps);
            if (createJobLinkUrl != null) {
                linkedHashMap.put(str2, createJobLinkUrl);
            }
        }
        return linkedHashMap;
    }

    Set<String> getApplicationIds(ExecutableFlow executableFlow, String str, int i) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        int i2 = 0;
        try {
            FileIOUtils.LogData executionJobLog = getExecutionJobLog(executableFlow, str, 0, 50000, i);
            while (executionJobLog != null) {
                if (executionJobLog.getLength() <= 0) {
                    break;
                }
                logger.info("Get application ID for execution " + executableFlow.getExecutionId() + ", job " + str + ", attempt " + i + ", data offset " + i2);
                String data = executionJobLog.getData();
                int max = Math.max(data.lastIndexOf(10), Math.max(data.lastIndexOf(32), data.lastIndexOf(9)));
                if (max > -1) {
                    data = data.substring(0, max + 1);
                }
                linkedHashSet.addAll(ExecutionControllerUtils.findApplicationIdsFromLog(data));
                i2 = executionJobLog.getOffset() + data.length();
                executionJobLog = getExecutionJobLog(executableFlow, str, i2, 50000, i);
            }
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get application ID for execution " + executableFlow.getExecutionId() + ", job " + str + ", attempt " + i + ", data offset " + i2, e);
        }
        return linkedHashSet;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void cancelFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        synchronized (executableFlow) {
            Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows = this.executorLoader.fetchUnfinishedFlows();
            if (!fetchUnfinishedFlows.containsKey(Integer.valueOf(executableFlow.getExecutionId()))) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            Pair<ExecutionReference, ExecutableFlow> pair = fetchUnfinishedFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
            if (pair.getFirst().getExecutor().isPresent()) {
                this.apiGateway.callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION, str, new Pair[0]);
            } else {
                ExecutionControllerUtils.finalizeFlow(this.executorLoader, this.alerterHolder, executableFlow, "Cancelled before dispatching to executor", null);
            }
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void resumeFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId = this.executorLoader.fetchActiveFlowByExecId(executableFlow.getExecutionId());
            if (fetchActiveFlowByExecId == null) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            this.apiGateway.callWithReferenceByUser(fetchActiveFlowByExecId.getFirst(), ConnectorParams.RESUME_ACTION, str, new Pair[0]);
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void pauseFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId = this.executorLoader.fetchActiveFlowByExecId(executableFlow.getExecutionId());
            if (fetchActiveFlowByExecId == null) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            this.apiGateway.callWithReferenceByUser(fetchActiveFlowByExecId.getFirst(), ConnectorParams.PAUSE_ACTION, str, new Pair[0]);
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void retryFailures(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_RETRY_FAILURES, str, new String[0]);
    }

    private Map<String, Object> modifyExecutingJobs(ExecutableFlow executableFlow, String str, String str2, String... strArr) throws ExecutorManagerException {
        Map<String, Object> callWithReferenceByUser;
        Map<String, Object> map;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId = this.executorLoader.fetchActiveFlowByExecId(executableFlow.getExecutionId());
            if (fetchActiveFlowByExecId == null) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            if (strArr == null || strArr.length <= 0) {
                callWithReferenceByUser = this.apiGateway.callWithReferenceByUser(fetchActiveFlowByExecId.getFirst(), ConnectorParams.MODIFY_EXECUTION_ACTION, str2, new Pair<>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, str));
            } else {
                for (String str3 : strArr) {
                    if (!str3.isEmpty() && executableFlow.getExecutableNode(str3) == null) {
                        throw new ExecutorManagerException("Job " + str3 + " doesn't exist in execution " + executableFlow.getExecutionId() + ".");
                    }
                }
                callWithReferenceByUser = this.apiGateway.callWithReferenceByUser(fetchActiveFlowByExecId.getFirst(), ConnectorParams.MODIFY_EXECUTION_ACTION, str2, new Pair<>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, str), new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, StringUtils.join(strArr, ',')));
            }
            map = callWithReferenceByUser;
        }
        return map;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Map<String, String> doRampActions(List<Map<String, Object>> list) throws ExecutorManagerException {
        return this.executorLoader.doRampActions(list);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public String submitExecutableFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        String str2;
        if (executableFlow.isLocked()) {
            String format = String.format("Flow %s for project %s is locked.", executableFlow.getId(), executableFlow.getProjectName());
            logger.info(format);
            return format;
        }
        synchronized ((executableFlow.getProjectName() + "." + executableFlow.getId() + ".submitFlow").intern()) {
            String flowId = executableFlow.getFlowId();
            logger.info("Submitting execution flow " + flowId + " by " + str);
            String str3 = "";
            int projectId = executableFlow.getProjectId();
            executableFlow.setSubmitUser(str);
            executableFlow.setStatus(Status.PREPARING);
            executableFlow.setSubmitTime(System.currentTimeMillis());
            List<Integer> runningFlows = getRunningFlows(projectId, flowId);
            ExecutionOptions executionOptions = executableFlow.getExecutionOptions();
            if (executionOptions == null) {
                executionOptions = new ExecutionOptions();
            }
            if (executionOptions.getDisabledJobs() != null) {
                FlowUtils.applyDisabledJobs(executionOptions.getDisabledJobs(), executableFlow);
            }
            if (!runningFlows.isEmpty()) {
                int maxConcurrentRunsForFlow = ExecutorUtils.getMaxConcurrentRunsForFlow(executableFlow.getProjectName(), flowId, this.maxConcurrentRunsOneFlow, this.maxConcurrentRunsPerFlowMap);
                if (runningFlows.size() > maxConcurrentRunsForFlow) {
                    this.commonMetrics.markSubmitFlowSkip();
                    throw new ExecutorManagerException("Flow " + flowId + " has more than " + maxConcurrentRunsForFlow + " concurrent runs. Skipping", ExecutorManagerException.Reason.SkippedExecution);
                }
                if (executionOptions.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
                    Collections.sort(runningFlows);
                    Integer num = runningFlows.get(runningFlows.size() - 1);
                    executionOptions.setPipelineExecutionId(num);
                    str3 = "Flow " + flowId + " is already running with exec id " + num + ". Pipelining level " + executionOptions.getPipelineLevel() + ". \n";
                } else {
                    if (executionOptions.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
                        this.commonMetrics.markSubmitFlowSkip();
                        throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
                    }
                    str3 = "Flow " + flowId + " is already running with exec id " + StringUtils.join(runningFlows, ",") + ". Will execute concurrently. \n";
                }
            }
            executionOptions.setMemoryCheck(!ProjectWhitelist.isProjectWhitelisted(executableFlow.getProjectId(), ProjectWhitelist.WhitelistType.MemoryCheck));
            this.executorLoader.uploadExecutableFlow(executableFlow);
            this.commonMetrics.markSubmitFlowSuccess();
            str2 = str3 + "Execution queued successfully with exec id " + executableFlow.getExecutionId();
        }
        return str2;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Map<String, Object> callExecutorStats(int i, String str, Pair<String, String>... pairArr) throws IOException, ExecutorManagerException {
        Executor fetchExecutor = fetchExecutor(i);
        ArrayList arrayList = new ArrayList();
        if (pairArr != null) {
            arrayList.addAll(Arrays.asList(pairArr));
        }
        arrayList.add(new Pair(ConnectorParams.ACTION_PARAM, str));
        return this.apiGateway.callForJsonObjectMap(fetchExecutor.getHost(), fetchExecutor.getPort(), "/stats", arrayList);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Map<String, Object> callExecutorJMX(String str, String str2, String str3) throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Pair(str2, ""));
        if (str3 != null) {
            arrayList.add(new Pair(ConnectorParams.JMX_MBEAN, str3));
        }
        String[] split = str.split(JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER);
        return this.apiGateway.callForJsonObjectMap(split[0], Integer.valueOf(split[1]).intValue(), "/jmx", arrayList);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void start() {
        this.executorHealthChecker.start();
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void shutdown() {
        this.executorHealthChecker.shutdown();
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public int getExecutableFlows(int i, String str, int i2, int i3, List<ExecutableFlow> list) throws ExecutorManagerException {
        list.addAll(this.executorLoader.fetchFlowHistory(i, str, i2, i3));
        return this.executorLoader.fetchNumExecutableFlows(i, str);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(int i, String str, int i2, int i3, Status status) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(i, str, i2, i3, status);
    }
}
