/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.parseq.exec;

import com.linkedin.parseq.Exceptions;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Exec {
    private static final Logger LOGGER = LoggerFactory.getLogger(Exec.class);
    private final ConcurrentMap<Process, ProcessEntry> _runningProcesses = new ConcurrentHashMap<Process, ProcessEntry>();
    private final ConcurrentMap<Long, Process> _runningProcessesByTaskId = new ConcurrentHashMap<Long, Process>();
    private final ScheduledExecutorService _reaperExecutor = Executors.newSingleThreadScheduledExecutor();
    private final AtomicLong _seqGenerator = new AtomicLong(0L);
    private final ConcurrentSkipListSet<ProcessRequest> _processRequestQueue = new ConcurrentSkipListSet<ProcessRequest>(Comparator.comparingLong(request -> request.getSeq()));
    private final int _parallelizationLevel;
    private final long _reaperDelayMs;
    private final int _maxProcessQueueSize;
    private final AtomicInteger _processQueueSize = new AtomicInteger(0);
    private volatile boolean _shutdownInitiated = false;

    public Exec(int parallelizationLevel, long reaperDelayMs, int maxProcessQueueSize) {
        this._parallelizationLevel = parallelizationLevel;
        this._reaperDelayMs = reaperDelayMs;
        this._maxProcessQueueSize = maxProcessQueueSize;
    }

    public Task<Result> command(String desc, long timeout, TimeUnit timeUnit, String ... command) {
        Task task = Task.async(desc, ctx -> {
            int queueSize = this._processQueueSize.get();
            if (this._shutdownInitiated) {
                throw new IllegalStateException("can't start new process because Exec has been shut down");
            }
            if (queueSize >= this._maxProcessQueueSize) {
                throw new RuntimeException("queue for processes to run is full, size: " + queueSize);
            }
            SettablePromise<Result> result = Promises.settable();
            ProcessBuilder builder = new ProcessBuilder(command);
            Path stderr = Files.createTempFile("parseq-Exec", ".stderr", new FileAttribute[0]);
            Path stdout = Files.createTempFile("parseq-Exec", ".stdout", new FileAttribute[0]);
            builder.redirectError(stderr.toFile());
            builder.redirectOutput(stdout.toFile());
            ProcessRequest request = new ProcessRequest(this._seqGenerator.getAndIncrement(), builder, new ProcessEntry(result, stdout, stderr, ctx.getTaskId()), timeout, timeUnit, ctx.getTaskId());
            this._processRequestQueue.add(request);
            this._processQueueSize.incrementAndGet();
            return result;
        });
        task.addListener(p -> {
            Process process;
            if (p.isFailed() && Exceptions.isCancellation(p.getError()) && (process = (Process)this._runningProcessesByTaskId.get(task.getId())) != null) {
                process.destroyForcibly();
            }
        });
        return task;
    }

    public void start() {
        this._reaperExecutor.scheduleWithFixedDelay(() -> {
            try {
                for (Map.Entry en : this._runningProcesses.entrySet()) {
                    Process process = (Process)en.getKey();
                    ProcessEntry entry = (ProcessEntry)en.getValue();
                    if (process.isAlive()) continue;
                    this._runningProcesses.remove(process);
                    this._runningProcessesByTaskId.remove(entry.getTaskId());
                    Result result = new Result(process.exitValue(), entry.getStdout(), entry.getStderr());
                    entry.getResultPromise().done(result);
                }
                while (this._runningProcesses.size() < this._parallelizationLevel && !this._processRequestQueue.isEmpty()) {
                    ProcessRequest request = this._processRequestQueue.pollFirst();
                    if (request == null) continue;
                    this._processQueueSize.decrementAndGet();
                    Process process = request.getBuilder().start();
                    this._runningProcesses.put(process, request.getEntry());
                    this._runningProcessesByTaskId.put(request.getTaskId(), process);
                    this._reaperExecutor.schedule(() -> {
                        if (process.isAlive()) {
                            process.destroyForcibly();
                        }
                    }, request.getTimeout(), request.getTimeUnit());
                }
            }
            catch (Exception e) {
                LOGGER.error("error while checking process status", e);
            }
        }, this._reaperDelayMs, this._reaperDelayMs, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this._shutdownInitiated = true;
        this._reaperExecutor.shutdown();
    }

    private static class ProcessRequest {
        private final long _seq;
        private final ProcessBuilder _builder;
        private final ProcessEntry _entry;
        private final long _timeout;
        private final TimeUnit _timeUnit;
        private final Long _taskId;

        public ProcessRequest(long seq, ProcessBuilder builder, ProcessEntry entry, long timeout, TimeUnit timeUnit, Long taskId) {
            this._seq = seq;
            this._builder = builder;
            this._entry = entry;
            this._timeout = timeout;
            this._timeUnit = timeUnit;
            this._taskId = taskId;
        }

        public long getSeq() {
            return this._seq;
        }

        public ProcessBuilder getBuilder() {
            return this._builder;
        }

        public ProcessEntry getEntry() {
            return this._entry;
        }

        public long getTimeout() {
            return this._timeout;
        }

        public TimeUnit getTimeUnit() {
            return this._timeUnit;
        }

        public Long getTaskId() {
            return this._taskId;
        }
    }

    private static class ProcessEntry {
        private final SettablePromise<Result> _resultPromise;
        private final Path _stdout;
        private final Path _stderr;
        private final Long _taskId;

        public ProcessEntry(SettablePromise<Result> resultPromise, Path stdout, Path stderr, Long taskId) {
            this._resultPromise = resultPromise;
            this._stderr = stderr;
            this._stdout = stdout;
            this._taskId = taskId;
        }

        public SettablePromise<Result> getResultPromise() {
            return this._resultPromise;
        }

        public Path getStdout() {
            return this._stdout;
        }

        public Path getStderr() {
            return this._stderr;
        }

        public Long getTaskId() {
            return this._taskId;
        }
    }

    public static class Result {
        private final Path _stdout;
        private final Path _stderr;
        private final int status;

        public Result(int status, Path stdout, Path stderr) {
            this.status = status;
            this._stdout = stdout;
            this._stderr = stderr;
        }

        public Path getStdout() {
            return this._stdout;
        }

        public Path getStderr() {
            return this._stderr;
        }

        public int getStatus() {
            return this.status;
        }
    }
}

