package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.class */
public class LagBasedAutoScaler implements SupervisorTaskAutoScaler {
    private final String dataSource;
    private final CircularFifoQueue<Long> lagMetricsQueue;
    private final ScheduledExecutorService lagComputationExec;
    private final ScheduledExecutorService allocationExec;
    private final SupervisorSpec spec;
    private final SeekableStreamSupervisor supervisor;
    private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
    private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class);
    private static final ReentrantLock LOCK = new ReentrantLock(true);

    public LagBasedAutoScaler(SeekableStreamSupervisor seekableStreamSupervisor, String str, LagBasedAutoScalerConfig lagBasedAutoScalerConfig, SupervisorSpec supervisorSpec) {
        this.lagBasedAutoScalerConfig = lagBasedAutoScalerConfig;
        String format = StringUtils.format("Supervisor-%s", str);
        this.dataSource = str;
        this.lagMetricsQueue = new CircularFifoQueue<>(((int) (this.lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / this.lagBasedAutoScalerConfig.getLagCollectionIntervalMillis())) + 1);
        this.allocationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(format) + "-Allocation-%d");
        this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(format) + "-Computation-%d");
        this.spec = supervisorSpec;
        this.supervisor = seekableStreamSupervisor;
    }

    @Override // org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler
    public void start() {
        Callable<Integer> callable = () -> {
            LOCK.lock();
            int i = -1;
            try {
                try {
                    i = computeDesiredTaskCount(new ArrayList(this.lagMetricsQueue));
                    if (i != -1) {
                        this.lagMetricsQueue.clear();
                    }
                    LOCK.unlock();
                } catch (Exception e) {
                    log.warn(e, "Exception while computing desired task count for [%s]", this.dataSource);
                    LOCK.unlock();
                }
                return Integer.valueOf(i);
            } catch (Throwable th) {
                LOCK.unlock();
                throw th;
            }
        };
        this.lagComputationExec.scheduleAtFixedRate(computeAndCollectLag(), this.lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), this.lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), TimeUnit.MILLISECONDS);
        this.allocationExec.scheduleAtFixedRate(this.supervisor.buildDynamicAllocationTask(callable), this.lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + this.lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), this.lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), TimeUnit.MILLISECONDS);
        log.info("LagBasedAutoScaler will collect lag every [%d] millis and will keep [%d] data points for the last [%d] millis for dataSource [%s]", Long.valueOf(this.lagBasedAutoScalerConfig.getLagCollectionIntervalMillis()), Integer.valueOf(this.lagMetricsQueue.size()), Long.valueOf(this.lagBasedAutoScalerConfig.getLagCollectionRangeMillis()), this.dataSource);
    }

    @Override // org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler
    public void stop() {
        this.allocationExec.shutdownNow();
        this.lagComputationExec.shutdownNow();
    }

    @Override // org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler
    public void reset() {
        if (this.lagMetricsQueue != null) {
            try {
                try {
                    LOCK.lock();
                    this.lagMetricsQueue.clear();
                    LOCK.unlock();
                } catch (Exception e) {
                    log.warn(e, "Error,when clear queue in rest action", new Object[0]);
                    LOCK.unlock();
                }
            } catch (Throwable th) {
                LOCK.unlock();
                throw th;
            }
        }
    }

    private Runnable computeAndCollectLag() {
        return () -> {
            LOCK.lock();
            try {
                try {
                    if (this.spec.isSuspended()) {
                        log.warn("[%s] supervisor is suspended, skipping lag collection", this.dataSource);
                    } else {
                        LagStats computeLagStats = this.supervisor.computeLagStats();
                        if (computeLagStats == null) {
                            this.lagMetricsQueue.offer(0L);
                        } else {
                            long totalLag = computeLagStats.getTotalLag();
                            this.lagMetricsQueue.offer(Long.valueOf(totalLag > 0 ? totalLag : 0L));
                        }
                        log.debug("Current lags [%s] for dataSource [%s].", new ArrayList(this.lagMetricsQueue), this.dataSource);
                    }
                    LOCK.unlock();
                } catch (Exception e) {
                    log.error(e, "Error while collecting lags", new Object[0]);
                    LOCK.unlock();
                }
            } catch (Throwable th) {
                LOCK.unlock();
                throw th;
            }
        };
    }

    private int computeDesiredTaskCount(List<Long> list) {
        log.debug("Computing desired task count for [%s], based on following lags : [%s]", this.dataSource, list);
        int i = 0;
        int i2 = 0;
        int size = list.size();
        for (Long l : list) {
            if (l.longValue() >= this.lagBasedAutoScalerConfig.getScaleOutThreshold()) {
                i++;
            }
            if (l.longValue() <= this.lagBasedAutoScalerConfig.getScaleInThreshold()) {
                i2++;
            }
        }
        double d = (i * 1.0d) / size;
        double d2 = (i2 * 1.0d) / size;
        log.debug("Calculated beyondProportion is [%s] and withinProportion is [%s] for dataSource [%s].", Double.valueOf(d), Double.valueOf(d2), this.dataSource);
        int activeTaskGroupsCount = this.supervisor.getActiveTaskGroupsCount();
        if (d < this.lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
            if (d2 < this.lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
                return -1;
            }
            int scaleInStep = activeTaskGroupsCount - this.lagBasedAutoScalerConfig.getScaleInStep();
            if (activeTaskGroupsCount != this.lagBasedAutoScalerConfig.getTaskCountMin()) {
                return Math.max(scaleInStep, this.lagBasedAutoScalerConfig.getTaskCountMin());
            }
            log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", this.dataSource);
            return -1;
        }
        int scaleOutStep = activeTaskGroupsCount + this.lagBasedAutoScalerConfig.getScaleOutStep();
        int partitionCount = this.supervisor.getPartitionCount();
        if (partitionCount <= 0) {
            log.warn("Partition number for [%s] <= 0 ? how can it be?", this.dataSource);
            return -1;
        }
        int min = Math.min(this.lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
        if (activeTaskGroupsCount != min) {
            return Math.min(scaleOutStep, min);
        }
        log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", this.dataSource);
        return -1;
    }

    public LagBasedAutoScalerConfig getAutoScalerConfig() {
        return this.lagBasedAutoScalerConfig;
    }
}
