/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.parseq.internal;

import com.linkedin.parseq.internal.Clock;
import com.linkedin.parseq.internal.ThreadDumper;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class ExecutionMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionMonitor.class);
    private static final ThreadDumper THREAD_DUMPER = new ThreadDumper();
    private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.###");
    private final ThreadLocal<ExecutionMonitorState> LOCAL_MONITOR = new ThreadLocal<ExecutionMonitorState>(){

        @Override
        protected ExecutionMonitorState initialValue() {
            ExecutionMonitorState state = new ExecutionMonitorState(Thread.currentThread().getId());
            return state;
        }
    };
    private final ConcurrentLinkedQueue<ExecutionMonitorState> _addedMonitors = new ConcurrentLinkedQueue();
    private final int _maxMonitors;
    private final long _durationThresholdNano;
    private final long _checkIntervalNano;
    private final long _idleDurationNano;
    private final long _loggingIntervalNano;
    private final long _minStallNano;
    private final int _stallsHistorySize;
    private final Consumer<String> _logger;
    private final Clock _clock;
    private volatile boolean _stopped = false;
    private final Set<ExecutionMonitorState> _monitors = new HashSet<ExecutionMonitorState>();
    private final TreeMap<Long, Long> _stalls = new TreeMap();
    private final Thread _monitoringThread;
    private long _lastMonitoringStep;
    private long _nextAllowedLogging;
    private long _shortestObservedDelta = Long.MAX_VALUE;

    public ExecutionMonitor(int maxMonitors, long durationThresholdNano, long checkIntervalNano, long idleDurationNano, long loggingIntervalNano, long minStallNano, int stallsHistorySize, Level level, Clock clock) {
        this._maxMonitors = maxMonitors;
        this._durationThresholdNano = durationThresholdNano;
        this._checkIntervalNano = checkIntervalNano;
        this._idleDurationNano = idleDurationNano;
        this._loggingIntervalNano = loggingIntervalNano;
        this._minStallNano = minStallNano;
        this._stallsHistorySize = stallsHistorySize;
        this._clock = clock;
        switch (level) {
            case INFO: {
                this._logger = LOG::info;
                break;
            }
            case DEBUG: {
                this._logger = LOG::debug;
                break;
            }
            case ERROR: {
                this._logger = LOG::error;
                break;
            }
            case TRACE: {
                this._logger = LOG::trace;
                break;
            }
            case WARN: {
                this._logger = LOG::warn;
                break;
            }
            default: {
                this._logger = LOG::warn;
            }
        }
        this._monitoringThread = new Thread(this::monitor);
        this._monitoringThread.setDaemon(true);
        this._monitoringThread.setName("ParSeqExecutionMonitor");
        this._monitoringThread.start();
    }

    ExecutionMonitorState getLocalMonitorState() {
        return this.LOCAL_MONITOR.get();
    }

    void shutdown() {
        this._stopped = true;
    }

    private void monitor() {
        this._nextAllowedLogging = this._lastMonitoringStep = this._clock.nanoTime();
        while (!this._stopped) {
            try {
                this._clock.sleepNano(this._checkIntervalNano);
            }
            catch (InterruptedException e) {
                break;
            }
            this.monitorStep();
        }
    }

    private void monitorStep() {
        long currentTime = this._clock.nanoTime();
        this.checkForStall(currentTime);
        this.drainAddedMonitorsQueue();
        ArrayList<ExecutionMonitorState> toRemove = new ArrayList<ExecutionMonitorState>();
        long oldestTimestamp = currentTime;
        boolean thereAreLongRunningThreads = false;
        for (ExecutionMonitorState m : this._monitors) {
            if (m._isActive) {
                if (m._lastUpdate - oldestTimestamp < 0L) {
                    oldestTimestamp = m._lastUpdate;
                }
                long stallTime = this.getStallsSince(m._lastUpdate);
                long activeTime = currentTime - m._lastUpdate - stallTime;
                if (activeTime <= this._durationThresholdNano) continue;
                thereAreLongRunningThreads = true;
                continue;
            }
            if (currentTime - m._lastUpdate <= this._idleDurationNano) continue;
            toRemove.add(m);
        }
        for (ExecutionMonitorState m : toRemove) {
            this._monitors.remove(m);
        }
        if (thereAreLongRunningThreads && currentTime - this._nextAllowedLogging >= 0L) {
            this._nextAllowedLogging = currentTime + this._loggingIntervalNano;
            this.logMonitoredThreads(this._monitors);
        }
        this.trimStalls(oldestTimestamp);
        this._lastMonitoringStep = this._clock.nanoTime();
    }

    private void checkForStall(long currentTime) {
        long stall;
        long delta = currentTime - this._lastMonitoringStep;
        if (delta < this._shortestObservedDelta) {
            this._shortestObservedDelta = delta;
        }
        if ((stall = Math.max(0L, delta - this._shortestObservedDelta)) > this._minStallNano) {
            this._stalls.put(this._lastMonitoringStep, stall);
            if (this._stalls.size() > this._stallsHistorySize) {
                this._stalls.pollFirstEntry();
            }
        }
    }

    private void drainAddedMonitorsQueue() {
        ExecutionMonitorState monitor = null;
        do {
            if ((monitor = this._addedMonitors.poll()) == null) continue;
            if (this._monitors.size() < this._maxMonitors) {
                this._monitors.add(monitor);
                continue;
            }
            LOG.warn("Exceeded number of maximum monitored threads, thread with Id=" + monitor._threadId + " will not be monitored");
        } while (monitor != null);
    }

    private void logMonitoredThreads(Set<ExecutionMonitorState> monitoredThreads) {
        StringBuilder sb = new StringBuilder();
        sb.append("Found ParSeq threads running longer than ").append(DECIMAL_FORMAT.format((double)this._durationThresholdNano / 1000000.0)).append("ms.\n\nMonitored ParSeq threads before thread dump: \n");
        this.logMonitoredThreads(monitoredThreads, this._clock.nanoTime(), sb);
        sb.append("\nThread dump:\n\n");
        THREAD_DUMPER.threadDump(sb);
        sb.append("Monitored ParSeq threads after thread dump: \n");
        this.logMonitoredThreads(monitoredThreads, this._clock.nanoTime(), sb);
        this._logger.accept(sb.toString());
    }

    private void logMonitoredThreads(Set<ExecutionMonitorState> monitoredThreads, long currentTime, StringBuilder sb) {
        for (ExecutionMonitorState m : monitoredThreads) {
            long runTime = Math.max(0L, currentTime - m._lastUpdate);
            if (runTime > this._durationThresholdNano && m._isActive) {
                sb.append("(!) ");
            } else {
                sb.append("    ");
            }
            sb.append("Thread Id=").append(m._threadId).append(m._isActive ? " busy for " : " idle for ").append(DECIMAL_FORMAT.format((double)runTime / 1000000.0)).append("ms\n");
        }
    }

    private void trimStalls(long oldestTimestamp) {
        while (!this._stalls.isEmpty() && this._stalls.firstKey() < oldestTimestamp) {
            this._stalls.remove(this._stalls.firstKey());
        }
    }

    long getStallsSince(long lastUpdate) {
        long stall = 0L;
        Map.Entry<Long, Long> entry = this._stalls.ceilingEntry(lastUpdate);
        while (entry != null) {
            stall += entry.getValue().longValue();
            entry = this._stalls.higherEntry(entry.getKey());
        }
        return stall;
    }

    class ExecutionMonitorState {
        private final long _threadId;
        private volatile long _lastUpdate = 0L;
        private volatile boolean _isActive = false;
        private volatile boolean _isMonitored = false;

        public ExecutionMonitorState(long threadId) {
            this._threadId = threadId;
        }

        public void activate() {
            this._lastUpdate = ExecutionMonitor.this._clock.nanoTime();
            this._isActive = true;
            if (!this._isMonitored) {
                this._isMonitored = true;
                ExecutionMonitor.this._addedMonitors.add(this);
            }
        }

        public void deactivate() {
            this._lastUpdate = ExecutionMonitor.this._clock.nanoTime();
            this._isActive = false;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (int)(this._threadId ^ this._threadId >>> 32);
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            ExecutionMonitorState other = (ExecutionMonitorState)obj;
            return this._threadId == other._threadId;
        }
    }
}

