package azkaban.executor;

import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:azkaban/executor/ExecutorHealthChecker.class */
public class ExecutorHealthChecker {
    private static final int DEFAULT_EXECUTOR_MAX_FAILURE_COUNT = 6;
    private final long healthCheckIntervalMin;
    private final int executorMaxFailureCount;
    private final List<String> alertEmails;
    private final ExecutorLoader executorLoader;
    private final ExecutorApiGateway apiGateway;
    private final AlerterHolder alerterHolder;
    private static final Logger logger = LoggerFactory.getLogger(ExecutorHealthChecker.class);
    private static final Duration DEFAULT_EXECUTOR_HEALTHCHECK_INTERVAL = Duration.ofMinutes(5);
    private final Map<Integer, Integer> executorFailureCount = new HashMap();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    @Inject
    public ExecutorHealthChecker(Props props, ExecutorLoader executorLoader, ExecutorApiGateway executorApiGateway, AlerterHolder alerterHolder) {
        this.healthCheckIntervalMin = props.getLong("azkaban.executor.healthcheck.interval.min", DEFAULT_EXECUTOR_HEALTHCHECK_INTERVAL.toMinutes());
        this.executorMaxFailureCount = props.getInt("azkaban.executor.max.failurecount", DEFAULT_EXECUTOR_MAX_FAILURE_COUNT);
        this.alertEmails = props.getStringList("azkaban.admin.alert.email");
        this.executorLoader = executorLoader;
        this.apiGateway = executorApiGateway;
        this.alerterHolder = alerterHolder;
    }

    public void start() {
        logger.info("Starting executor health checker.");
        this.scheduler.scheduleAtFixedRate(this::checkExecutorHealthQuietly, 0L, this.healthCheckIntervalMin, TimeUnit.MINUTES);
    }

    public void shutdown() {
        logger.info("Shutting down executor health checker.");
        this.scheduler.shutdown();
        try {
            if (!this.scheduler.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void checkExecutorHealthQuietly() {
        try {
            checkExecutorHealth();
        } catch (RuntimeException e) {
            logger.error("Unexepected error during executor healthcheck. Cause: " + ExceptionUtils.getStackTrace(e));
        }
    }

    @VisibleForTesting
    void checkExecutorHealth() {
        for (Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : getFlowToExecutorMap().entrySet()) {
            Optional<Executor> key = entry.getKey();
            if (key.isPresent()) {
                Executor executor = key.get();
                try {
                    Map<String, Object> callWithExecutionId = this.apiGateway.callWithExecutionId(executor.getHost(), executor.getPort(), ConnectorParams.PING_ACTION, null, null, new Pair[0]);
                    if (callWithExecutionId == null || callWithExecutionId.containsKey(ConnectorParams.RESPONSE_ERROR) || !callWithExecutionId.containsKey("status") || !callWithExecutionId.get("status").equals(ConnectorParams.RESPONSE_ALIVE)) {
                        throw new ExecutorManagerException("Status of executor " + executor.getId() + " is not alive.");
                        break;
                    } else if (this.executorFailureCount.containsKey(Integer.valueOf(executor.getId()))) {
                        this.executorFailureCount.put(Integer.valueOf(executor.getId()), 0);
                    }
                } catch (ExecutorManagerException e) {
                    handleExecutorNotAliveCase(executor, entry.getValue(), e);
                }
            } else {
                finalizeFlows(entry.getValue(), "Executor id of this execution doesn't exist.");
            }
        }
    }

    @VisibleForTesting
    void finalizeFlows(List<ExecutableFlow> list, String str) {
        for (ExecutableFlow executableFlow : list) {
            logger.warn(String.format("Finalizing execution %s, %s", Integer.valueOf(executableFlow.getExecutionId()), str));
            try {
                ExecutionControllerUtils.finalizeFlow(this.executorLoader, this.alerterHolder, executableFlow, str, null);
            } catch (RuntimeException e) {
                logger.error(String.format("Unchecked exception while finalizing execution: %d. Exception: %s", Integer.valueOf(executableFlow.getExecutionId()), ExceptionUtils.getStackTrace(e)));
            }
        }
    }

    private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
        HashMap hashMap = new HashMap();
        try {
            for (Pair<ExecutionReference, ExecutableFlow> pair : this.executorLoader.fetchActiveFlows().values()) {
                Optional<Executor> executor = pair.getFirst().getExecutor();
                List list = (List) hashMap.get(executor);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(executor, list);
                }
                list.add(pair.getSecond());
            }
        } catch (ExecutorManagerException e) {
            logger.error("Failed to get flow to executor map. Exception reported: " + ExceptionUtils.getStackTrace(e));
        }
        return hashMap;
    }

    private void handleExecutorNotAliveCase(Executor executor, List<ExecutableFlow> list, ExecutorManagerException executorManagerException) {
        logger.error("Failed to get update from executor " + executor.getId(), executorManagerException);
        this.executorFailureCount.put(Integer.valueOf(executor.getId()), Integer.valueOf(this.executorFailureCount.getOrDefault(Integer.valueOf(executor.getId()), 0).intValue() + 1));
        if (this.executorFailureCount.get(Integer.valueOf(executor.getId())).intValue() % this.executorMaxFailureCount == 0) {
            if (!this.alertEmails.isEmpty()) {
                logger.info(String.format("Executor failure count is %d. Sending alert emails to %s.", this.executorFailureCount.get(Integer.valueOf(executor.getId())), this.alertEmails));
                this.alerterHolder.get("email").alertOnFailedExecutorHealthCheck(executor, list, executorManagerException, this.alertEmails);
            }
            cleanupForMissingExecutor(executor, list);
        }
    }

    private void cleanupForMissingExecutor(Executor executor, List<ExecutableFlow> list) {
        finalizeFlows(list, String.format("Executor was unreachable, executor-id: %s, executor-host: %s, executor-port: %d", Integer.valueOf(executor.getId()), executor.getHost(), Integer.valueOf(executor.getPort())));
    }
}
