package org.apache.gobblin.yarn;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.compress.utils.Sets;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/yarn/YarnAutoScalingManager.class */
public class YarnAutoScalingManager extends AbstractIdleService {
    private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20;
    private final String defaultHelixInstanceTags;
    private final int defaultContainerMemoryMbs;
    private final int defaultContainerCores;
    public static final int DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
    private final Config config;
    private final HelixManager helixManager;
    private final ScheduledExecutorService autoScalingExecutor;
    private final YarnService yarnService;
    private final int partitionsPerContainer;
    private final double overProvisionFactor;
    private final SlidingWindowReservoir slidingFixedSizeWindow;
    private static final Logger log = LoggerFactory.getLogger(YarnAutoScalingManager.class);
    private static int maxIdleTimeInMinutesBeforeScalingDown = 10;
    private static final HashSet<TaskPartitionState> UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(new TaskPartitionState[]{TaskPartitionState.ERROR, TaskPartitionState.DROPPED, TaskPartitionState.COMPLETED, TaskPartitionState.TIMED_OUT});
    private final String AUTO_SCALING_PREFIX = "gobblin.yarn.autoScaling.";
    private final String AUTO_SCALING_POLLING_INTERVAL_SECS = "gobblin.yarn.autoScaling.pollingIntervalSeconds";
    private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
    private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = "gobblin.yarn.autoScaling.partitionsPerContainer";
    private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
    private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = "gobblin.yarn.autoScaling.overProvisionFactor";
    private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0d;
    private final String AUTO_SCALING_INITIAL_DELAY = "gobblin.yarn.autoScaling.initialDelay";
    private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
    private final String AUTO_SCALING_WINDOW_SIZE = "gobblin.yarn.autoScaling.windowSize";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/gobblin/yarn/YarnAutoScalingManager$SlidingWindowReservoir.class */
    public static class SlidingWindowReservoir {
        private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
        private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
        private int maxSize;
        private static final int DEFAULT_MAX_SIZE = 10;
        private int upperBound;

        public SlidingWindowReservoir(int i, int i2) {
            Preconditions.checkArgument(i > 0, "maxSize has to be a value larger than 0");
            this.maxSize = i;
            this.upperBound = i2;
            this.fifoQueue = new ArrayDeque<>(i);
            this.priorityQueue = new PriorityQueue<>(i, new Comparator<YarnContainerRequestBundle>() { // from class: org.apache.gobblin.yarn.YarnAutoScalingManager.SlidingWindowReservoir.1
                @Override // java.util.Comparator
                public int compare(YarnContainerRequestBundle yarnContainerRequestBundle, YarnContainerRequestBundle yarnContainerRequestBundle2) {
                    return Integer.valueOf(yarnContainerRequestBundle2.getTotalContainers()).compareTo(Integer.valueOf(yarnContainerRequestBundle.getTotalContainers()));
                }
            });
        }

        public SlidingWindowReservoir(int i) {
            this(10, i);
        }

        public void add(YarnContainerRequestBundle yarnContainerRequestBundle) {
            if (yarnContainerRequestBundle.getTotalContainers() > this.upperBound) {
                YarnAutoScalingManager.log.error(String.format("Request of getting %s containers seems to be excessive, rejected", yarnContainerRequestBundle));
                return;
            }
            if (this.fifoQueue.size() == this.maxSize) {
                this.priorityQueue.remove(this.fifoQueue.remove());
            }
            if (this.fifoQueue.size() != this.priorityQueue.size()) {
                throw new IllegalStateException("Queue has its internal data structure being inconsistent.");
            }
            this.fifoQueue.add(yarnContainerRequestBundle);
            this.priorityQueue.add(yarnContainerRequestBundle);
        }

        public YarnContainerRequestBundle getMax() {
            if (this.priorityQueue.size() > 0) {
                return this.priorityQueue.peek();
            }
            throw new IllegalStateException("Queried before elements added into the queue.");
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/gobblin/yarn/YarnAutoScalingManager$YarnAutoScalingRunnable.class */
    static class YarnAutoScalingRunnable implements Runnable {
        private final TaskDriver taskDriver;
        private final YarnService yarnService;
        private final int partitionsPerContainer;
        private final double overProvisionFactor;
        private final SlidingWindowReservoir slidingWindowReservoir;
        private final HelixDataAccessor helixDataAccessor;
        private final String defaultHelixInstanceTags;
        private final int defaultContainerMemoryMbs;
        private final int defaultContainerCores;
        private static final Map<String, Long> instanceIdleSince = new HashMap();

        @Override // java.lang.Runnable
        public void run() {
            try {
                runInternal();
            } catch (Throwable th) {
                YarnAutoScalingManager.log.warn("Suppressing error from YarnAutoScalingRunnable.run()", th);
            }
        }

        private Set<String> getParticipants(String str) {
            return (Set) this.helixDataAccessor.getChildValuesMap(this.helixDataAccessor.keyBuilder().liveInstances()).keySet().stream().filter(str2 -> {
                return str.isEmpty() || str2.contains(str);
            }).collect(Collectors.toSet());
        }

        private String getInuseParticipantForHelixPartition(JobContext jobContext, int i) {
            if (jobContext.getPartitionNumAttempts(i) > YarnAutoScalingManager.THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
                YarnAutoScalingManager.log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better", jobContext.getTaskIdForPartition(i), Integer.valueOf(jobContext.getPartitionNumAttempts(i)));
            }
            if (!YarnAutoScalingManager.UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(i))) {
                return jobContext.getAssignedParticipant(i);
            }
            YarnAutoScalingManager.log.info("Helix task {} is in {} state which is unexpected, please watch out to see if this get recovered", jobContext.getTaskIdForPartition(i), jobContext.getPartitionState(i));
            return null;
        }

        @VisibleForTesting
        void runInternal() {
            HashSet hashSet = new HashSet();
            YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
            for (Map.Entry entry : this.taskDriver.getWorkflows().entrySet()) {
                WorkflowContext workflowContext = this.taskDriver.getWorkflowContext((String) entry.getKey());
                if (workflowContext != null && workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
                    YarnAutoScalingManager.log.debug("Workflow name {} config {} context {}", new Object[]{entry.getKey(), entry.getValue(), workflowContext});
                    for (String str : ((WorkflowConfig) entry.getValue()).getJobDag().getAllNodes()) {
                        JobContext jobContext = this.taskDriver.getJobContext(str);
                        JobConfig jobConfig = this.taskDriver.getJobConfig(str);
                        Resource newInstance = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
                        int i = 0;
                        String str2 = this.defaultHelixInstanceTags;
                        if (jobContext != null) {
                            YarnAutoScalingManager.log.debug("JobContext {} num partitions {}", jobContext, Integer.valueOf(jobContext.getPartitionSet().size()));
                            hashSet.addAll((Collection) jobContext.getPartitionSet().stream().map(num -> {
                                return getInuseParticipantForHelixPartition(jobContext, num.intValue());
                            }).filter((v0) -> {
                                return Objects.nonNull(v0);
                            }).collect(Collectors.toSet()));
                            i = jobContext.getPartitionSet().size();
                            if (jobConfig != null) {
                                if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
                                    str2 = jobConfig.getInstanceGroupTag();
                                }
                                Map jobCommandConfigMap = jobConfig.getJobCommandConfigMap();
                                if (jobCommandConfigMap.containsKey("gobblin.cluster.job.container.memory.mbs")) {
                                    newInstance.setMemory(Integer.parseInt((String) jobCommandConfigMap.get("gobblin.cluster.job.container.memory.mbs")));
                                }
                                if (jobCommandConfigMap.containsKey("gobblin.cluster.job.container.cores")) {
                                    newInstance.setVirtualCores(Integer.parseInt((String) jobCommandConfigMap.get("gobblin.cluster.job.container.cores")));
                                }
                            }
                        }
                        int ceil = (int) Math.ceil((i / this.partitionsPerContainer) * this.overProvisionFactor);
                        yarnContainerRequestBundle.add(str2, ceil, newInstance);
                        YarnAutoScalingManager.log.info("jobName={}, jobTag={}, numPartitions={}, targetNumContainers={}", new Object[]{str, str2, Integer.valueOf(i), Integer.valueOf(ceil)});
                    }
                }
            }
            for (String str3 : getParticipants(GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX)) {
                if (hashSet.contains(str3)) {
                    instanceIdleSince.remove(str3);
                } else {
                    instanceIdleSince.putIfAbsent(str3, Long.valueOf(System.currentTimeMillis()));
                    if (!isInstanceUnused(str3)) {
                        hashSet.add(str3);
                    }
                }
            }
            this.slidingWindowReservoir.add(yarnContainerRequestBundle);
            YarnAutoScalingManager.log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}", new Object[]{Integer.valueOf(yarnContainerRequestBundle.getTotalContainers()), yarnContainerRequestBundle.getHelixTagContainerCountMap(), yarnContainerRequestBundle.getHelixTagResourceMap()});
            this.yarnService.requestTargetNumberOfContainers(this.slidingWindowReservoir.getMax(), hashSet);
        }

        @VisibleForTesting
        boolean isInstanceUnused(String str) {
            return System.currentTimeMillis() - instanceIdleSince.get(str).longValue() > TimeUnit.MINUTES.toMillis((long) YarnAutoScalingManager.maxIdleTimeInMinutesBeforeScalingDown);
        }

        public YarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int i, double d, SlidingWindowReservoir slidingWindowReservoir, HelixDataAccessor helixDataAccessor, String str, int i2, int i3) {
            this.taskDriver = taskDriver;
            this.yarnService = yarnService;
            this.partitionsPerContainer = i;
            this.overProvisionFactor = d;
            this.slidingWindowReservoir = slidingWindowReservoir;
            this.helixDataAccessor = helixDataAccessor;
            this.defaultHelixInstanceTags = str;
            this.defaultContainerMemoryMbs = i2;
            this.defaultContainerCores = i3;
        }
    }

    public YarnAutoScalingManager(GobblinApplicationMaster gobblinApplicationMaster) {
        this.config = gobblinApplicationMaster.getConfig();
        this.helixManager = gobblinApplicationMaster.getMultiManager().getJobClusterHelixManager();
        this.yarnService = gobblinApplicationMaster.getYarnService();
        this.partitionsPerContainer = ConfigUtils.getInt(this.config, "gobblin.yarn.autoScaling.partitionsPerContainer", 1).intValue();
        Preconditions.checkArgument(this.partitionsPerContainer > 0, "gobblin.yarn.autoScaling.partitionsPerContainer needs to be greater than 0");
        this.overProvisionFactor = ConfigUtils.getDouble(this.config, "gobblin.yarn.autoScaling.overProvisionFactor", 1.0d);
        this.slidingFixedSizeWindow = this.config.hasPath("gobblin.yarn.autoScaling.windowSize") ? new SlidingWindowReservoir(this.config.getInt("gobblin.yarn.autoScaling.windowSize"), Integer.MAX_VALUE) : new SlidingWindowReservoir(Integer.MAX_VALUE);
        this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("AutoScalingExecutor")));
        this.defaultHelixInstanceTags = ConfigUtils.getString(this.config, "gobblin.cluster.helixInstanceTags", "GobblinHelixDefaultTag");
        this.defaultContainerMemoryMbs = this.config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
        this.defaultContainerCores = this.config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
    }

    protected void startUp() {
        int intValue = ConfigUtils.getInt(this.config, "gobblin.yarn.autoScaling.pollingIntervalSeconds", 60).intValue();
        int intValue2 = ConfigUtils.getInt(this.config, "gobblin.yarn.autoScaling.initialDelay", 60).intValue();
        log.info("Starting the " + YarnAutoScalingManager.class.getSimpleName());
        log.info("Scheduling the auto scaling task with an interval of {} seconds", Integer.valueOf(intValue));
        this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager), this.yarnService, this.partitionsPerContainer, this.overProvisionFactor, this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags, this.defaultContainerMemoryMbs, this.defaultContainerCores), intValue2, intValue, TimeUnit.SECONDS);
    }

    protected void shutDown() {
        log.info("Stopping the " + YarnAutoScalingManager.class.getSimpleName());
        ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor, Optional.of(log));
    }
}
