package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.tasklogs.LogUtils;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/overlord/ForkingTaskRunner.class */
public class ForkingTaskRunner extends BaseRestorableTaskRunner<ForkingTaskRunnerWorkItem> implements TaskLogStreamer {
    private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class);
    private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
    private final ForkingTaskRunnerConfig config;
    private final Properties props;
    private final TaskLogPusher taskLogPusher;
    private final DruidNode node;
    private final ListeningExecutorService exec;
    private final PortFinder portFinder;
    private final StartupLoggingConfig startupLoggingConfig;
    private volatile boolean stopping;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/druid/indexing/overlord/ForkingTaskRunner$ForkingTaskRunnerWorkItem.class */
    public static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem {
        private final Task task;
        private volatile boolean shutdown;
        private volatile ProcessHolder processHolder;

        private ForkingTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> listenableFuture) {
            super(task.getId(), listenableFuture);
            this.shutdown = false;
            this.processHolder = null;
            this.task = task;
        }

        public Task getTask() {
            return this.task;
        }

        @Override // org.apache.druid.indexing.overlord.TaskRunnerWorkItem
        public TaskLocation getLocation() {
            return this.processHolder == null ? TaskLocation.unknown() : TaskLocation.create(this.processHolder.host, this.processHolder.port, this.processHolder.tlsPort);
        }

        @Override // org.apache.druid.indexing.overlord.TaskRunnerWorkItem
        public String getTaskType() {
            return this.task.getType();
        }

        @Override // org.apache.druid.indexing.overlord.TaskRunnerWorkItem
        public String getDataSource() {
            return this.task.getDataSource();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/druid/indexing/overlord/ForkingTaskRunner$ProcessHolder.class */
    public static class ProcessHolder {
        private final Process process;
        private final File logFile;
        private final String host;
        private final int port;
        private final int tlsPort;

        private ProcessHolder(Process process, File file, String str, int i, int i2) {
            this.process = process;
            this.logFile = file;
            this.host = str;
            this.port = i;
            this.tlsPort = i2;
        }

        @VisibleForTesting
        void registerWithCloser(Closer closer) {
            closer.register(this.process.getInputStream());
            closer.register(this.process.getOutputStream());
        }

        @VisibleForTesting
        void shutdown() {
            this.process.destroy();
        }
    }

    @Inject
    public ForkingTaskRunner(ForkingTaskRunnerConfig forkingTaskRunnerConfig, TaskConfig taskConfig, WorkerConfig workerConfig, Properties properties, TaskLogPusher taskLogPusher, ObjectMapper objectMapper, @Self DruidNode druidNode, StartupLoggingConfig startupLoggingConfig) {
        super(objectMapper, taskConfig);
        this.stopping = false;
        this.config = forkingTaskRunnerConfig;
        this.props = properties;
        this.taskLogPusher = taskLogPusher;
        this.node = druidNode;
        this.portFinder = new PortFinder(forkingTaskRunnerConfig.getStartPort(), forkingTaskRunnerConfig.getEndPort(), forkingTaskRunnerConfig.getPorts());
        this.startupLoggingConfig = startupLoggingConfig;
        this.exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d"));
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public ListenableFuture<TaskStatus> run(Task task) {
        ListenableFuture<TaskStatus> result;
        synchronized (this.tasks) {
            this.tasks.computeIfAbsent(task.getId(), str -> {
                return new ForkingTaskRunnerWorkItem(task, this.exec.submit((Callable) new Callable<TaskStatus>() { // from class: org.apache.druid.indexing.overlord.ForkingTaskRunner.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
                    java.lang.NullPointerException
                     */
                    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
                     */
                    /* JADX WARN: Not initialized variable reg: 18, insn: 0x07fd: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:157:0x07fd */
                    /* JADX WARN: Type inference failed for: r18v0, types: [org.apache.druid.java.util.common.io.Closer] */
                    @Override // java.util.concurrent.Callable
                    public TaskStatus call() {
                        ?? r18;
                        ProcessHolder processHolder;
                        TaskStatus failure;
                        String uuid = UUID.randomUUID().toString();
                        File taskDir = ForkingTaskRunner.this.taskConfig.getTaskDir(task.getId());
                        File file = new File(taskDir, uuid);
                        String host = ForkingTaskRunner.this.node.getHost();
                        int findUnusedPort = ForkingTaskRunner.this.node.isEnablePlaintextPort() ? ForkingTaskRunner.this.portFinder.findUnusedPort() : -1;
                        int findUnusedPort2 = ForkingTaskRunner.this.node.isEnableTlsPort() ? ForkingTaskRunner.this.portFinder.findUnusedPort() : -1;
                        TaskLocation create = TaskLocation.create(host, findUnusedPort, findUnusedPort2);
                        try {
                            try {
                                try {
                                    Closer create2 = Closer.create();
                                    try {
                                        if (!file.mkdirs()) {
                                            throw new IOE("Could not create directories: %s", file);
                                        }
                                        File file2 = new File(taskDir, "task.json");
                                        File file3 = new File(file, "status.json");
                                        File file4 = new File(taskDir, "log");
                                        File file5 = new File(file, "report.json");
                                        synchronized (ForkingTaskRunner.this.tasks) {
                                            ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem = (ForkingTaskRunnerWorkItem) ForkingTaskRunner.this.tasks.get(task.getId());
                                            if (forkingTaskRunnerWorkItem == null) {
                                                ForkingTaskRunner.LOGGER.makeAlert("TaskInfo disappeared!", new Object[0]).addData(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, task.getId()).emit();
                                                throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
                                            }
                                            if (forkingTaskRunnerWorkItem.shutdown) {
                                                throw new IllegalStateException("Task has been shut down!");
                                            }
                                            if (forkingTaskRunnerWorkItem.processHolder != null) {
                                                ForkingTaskRunner.LOGGER.makeAlert("TaskInfo already has a processHolder", new Object[0]).addData(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, task.getId()).emit();
                                                throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
                                            }
                                            ArrayList arrayList = new ArrayList();
                                            String classpath = (task.getClasspathPrefix() == null || task.getClasspathPrefix().isEmpty()) ? ForkingTaskRunner.this.config.getClasspath() : Joiner.on(File.pathSeparator).join(task.getClasspathPrefix(), ForkingTaskRunner.this.config.getClasspath(), new Object[0]);
                                            arrayList.add(ForkingTaskRunner.this.config.getJavaCommand());
                                            arrayList.add("-cp");
                                            arrayList.add(classpath);
                                            Iterables.addAll(arrayList, new QuotableWhiteSpaceSplitter(ForkingTaskRunner.this.config.getJavaOpts()));
                                            Iterables.addAll(arrayList, ForkingTaskRunner.this.config.getJavaOptsArray());
                                            Object contextValue = task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY);
                                            if (contextValue != null) {
                                                Iterables.addAll(arrayList, new QuotableWhiteSpaceSplitter((String) contextValue));
                                            }
                                            for (String str : ForkingTaskRunner.this.props.stringPropertyNames()) {
                                                Iterator<String> it2 = ForkingTaskRunner.this.config.getAllowedPrefixes().iterator();
                                                while (it2.hasNext()) {
                                                    if (str.startsWith(it2.next()) && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(str) && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(str)) {
                                                        arrayList.add(StringUtils.format("-D%s=%s", str, ForkingTaskRunner.this.props.getProperty(str)));
                                                    }
                                                }
                                            }
                                            for (String str2 : ForkingTaskRunner.this.props.stringPropertyNames()) {
                                                if (str2.startsWith(ForkingTaskRunner.CHILD_PROPERTY_PREFIX)) {
                                                    arrayList.add(StringUtils.format("-D%s=%s", str2.substring(ForkingTaskRunner.CHILD_PROPERTY_PREFIX.length()), ForkingTaskRunner.this.props.getProperty(str2)));
                                                }
                                            }
                                            Map<String, Object> context = task.getContext();
                                            if (context != null) {
                                                for (String str3 : context.keySet()) {
                                                    if (str3.startsWith(ForkingTaskRunner.CHILD_PROPERTY_PREFIX)) {
                                                        arrayList.add(StringUtils.format("-D%s=%s", str3.substring(ForkingTaskRunner.CHILD_PROPERTY_PREFIX.length()), task.getContextValue(str3)));
                                                    }
                                                }
                                            }
                                            arrayList.add(StringUtils.format("-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, "dataSource", task.getDataSource()));
                                            arrayList.add(StringUtils.format("-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.TASK_ID, task.getId()));
                                            arrayList.add(StringUtils.format("-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.TASK_TYPE, task.getType()));
                                            arrayList.add(StringUtils.format("-Ddruid.host=%s", host));
                                            arrayList.add(StringUtils.format("-Ddruid.plaintextPort=%d", Integer.valueOf(findUnusedPort)));
                                            arrayList.add(StringUtils.format("-Ddruid.tlsPort=%d", Integer.valueOf(findUnusedPort2)));
                                            arrayList.add(StringUtils.format("-Ddruid.task.executor.service=%s", ForkingTaskRunner.this.node.getServiceName()));
                                            arrayList.add(StringUtils.format("-Ddruid.task.executor.host=%s", ForkingTaskRunner.this.node.getHost()));
                                            arrayList.add(StringUtils.format("-Ddruid.task.executor.plaintextPort=%d", Integer.valueOf(ForkingTaskRunner.this.node.getPlaintextPort())));
                                            arrayList.add(StringUtils.format("-Ddruid.task.executor.enablePlaintextPort=%s", Boolean.valueOf(ForkingTaskRunner.this.node.isEnablePlaintextPort())));
                                            arrayList.add(StringUtils.format("-Ddruid.task.executor.tlsPort=%d", Integer.valueOf(ForkingTaskRunner.this.node.getTlsPort())));
                                            arrayList.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", Boolean.valueOf(ForkingTaskRunner.this.node.isEnableTlsPort())));
                                            arrayList.add("org.apache.druid.cli.Main");
                                            arrayList.add("internal");
                                            arrayList.add("peon");
                                            arrayList.add(file2.toString());
                                            arrayList.add(file3.toString());
                                            arrayList.add(file5.toString());
                                            String nodeType = task.getNodeType();
                                            if (nodeType != null) {
                                                arrayList.add("--nodeType");
                                                arrayList.add(nodeType);
                                            }
                                            if (task.supportsQueries()) {
                                                arrayList.add("--loadBroadcastSegments");
                                                arrayList.add("true");
                                            }
                                            if (!file2.exists()) {
                                                ForkingTaskRunner.this.jsonMapper.writeValue(file2, task);
                                            }
                                            ForkingTaskRunner.LOGGER.info("Running command: %s", ForkingTaskRunner.this.getMaskedCommand(ForkingTaskRunner.this.startupLoggingConfig.getMaskProperties(), arrayList));
                                            forkingTaskRunnerWorkItem.processHolder = ForkingTaskRunner.this.runTaskProcess(arrayList, file4, create);
                                            processHolder = forkingTaskRunnerWorkItem.processHolder;
                                            processHolder.registerWithCloser(create2);
                                        }
                                        TaskRunnerUtils.notifyLocationChanged(ForkingTaskRunner.this.listeners, task.getId(), create);
                                        TaskRunnerUtils.notifyStatusChanged(ForkingTaskRunner.this.listeners, task.getId(), TaskStatus.running(task.getId()));
                                        ForkingTaskRunner.LOGGER.info("Logging task %s output to: %s", task.getId(), file4);
                                        int waitForTaskProcessToComplete = ForkingTaskRunner.this.waitForTaskProcessToComplete(task, processHolder, file4, file5);
                                        if (waitForTaskProcessToComplete == 0) {
                                            ForkingTaskRunner.LOGGER.info("Process exited successfully for task: %s", task.getId());
                                            failure = (TaskStatus) ForkingTaskRunner.this.jsonMapper.readValue(file3, TaskStatus.class);
                                        } else {
                                            ForkingTaskRunner.LOGGER.error("Process exited with code[%d] for task: %s", Integer.valueOf(waitForTaskProcessToComplete), task.getId());
                                            failure = TaskStatus.failure(task.getId(), StringUtils.format("Task execution process exited unsuccessfully with code[%s]. See middleManager logs for more details.", Integer.valueOf(waitForTaskProcessToComplete)));
                                        }
                                        TaskRunnerUtils.notifyStatusChanged(ForkingTaskRunner.this.listeners, task.getId(), failure);
                                        TaskStatus taskStatus = failure;
                                        create2.close();
                                        try {
                                            synchronized (ForkingTaskRunner.this.tasks) {
                                                ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem2 = (ForkingTaskRunnerWorkItem) ForkingTaskRunner.this.tasks.remove(task.getId());
                                                if (forkingTaskRunnerWorkItem2 != null && forkingTaskRunnerWorkItem2.processHolder != null) {
                                                    forkingTaskRunnerWorkItem2.processHolder.shutdown();
                                                }
                                                if (!ForkingTaskRunner.this.stopping) {
                                                    ForkingTaskRunner.this.saveRunningTasks();
                                                }
                                            }
                                            if (ForkingTaskRunner.this.node.isEnablePlaintextPort()) {
                                                ForkingTaskRunner.this.portFinder.markPortUnused(findUnusedPort);
                                            }
                                            if (ForkingTaskRunner.this.node.isEnableTlsPort()) {
                                                ForkingTaskRunner.this.portFinder.markPortUnused(findUnusedPort2);
                                            }
                                            try {
                                                if (!ForkingTaskRunner.this.stopping && taskDir.exists()) {
                                                    FileUtils.deleteDirectory(taskDir);
                                                    ForkingTaskRunner.LOGGER.info("Removing task directory: %s", taskDir);
                                                }
                                            } catch (Exception e) {
                                                ForkingTaskRunner.LOGGER.makeAlert(e, "Failed to delete task directory", new Object[0]).addData("taskDir", taskDir.toString()).addData(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, task.getId()).emit();
                                            }
                                        } catch (Exception e2) {
                                            ForkingTaskRunner.LOGGER.error(e2, "Suppressing exception caught while cleaning up task", new Object[0]);
                                        }
                                        return taskStatus;
                                    } catch (Throwable th) {
                                        throw create2.rethrow(th);
                                    }
                                } catch (Throwable th2) {
                                    ForkingTaskRunner.LOGGER.info(th2, "Exception caught during execution", new Object[0]);
                                    throw new RuntimeException(th2);
                                }
                            } catch (Throwable th3) {
                                r18.close();
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            try {
                            } catch (Exception e3) {
                                ForkingTaskRunner.LOGGER.error(e3, "Suppressing exception caught while cleaning up task", new Object[0]);
                            }
                            synchronized (ForkingTaskRunner.this.tasks) {
                                ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem3 = (ForkingTaskRunnerWorkItem) ForkingTaskRunner.this.tasks.remove(task.getId());
                                if (forkingTaskRunnerWorkItem3 != null && forkingTaskRunnerWorkItem3.processHolder != null) {
                                    forkingTaskRunnerWorkItem3.processHolder.shutdown();
                                }
                                if (!ForkingTaskRunner.this.stopping) {
                                    ForkingTaskRunner.this.saveRunningTasks();
                                }
                                if (ForkingTaskRunner.this.node.isEnablePlaintextPort()) {
                                    ForkingTaskRunner.this.portFinder.markPortUnused(findUnusedPort);
                                }
                                if (ForkingTaskRunner.this.node.isEnableTlsPort()) {
                                    ForkingTaskRunner.this.portFinder.markPortUnused(findUnusedPort2);
                                }
                                try {
                                    if (!ForkingTaskRunner.this.stopping && taskDir.exists()) {
                                        FileUtils.deleteDirectory(taskDir);
                                        ForkingTaskRunner.LOGGER.info("Removing task directory: %s", taskDir);
                                    }
                                } catch (Exception e4) {
                                    ForkingTaskRunner.LOGGER.makeAlert(e4, "Failed to delete task directory", new Object[0]).addData("taskDir", taskDir.toString()).addData(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, task.getId()).emit();
                                }
                                throw th4;
                            }
                        }
                    }
                }));
            });
            saveRunningTasks();
            result = ((ForkingTaskRunnerWorkItem) this.tasks.get(task.getId())).getResult();
        }
        return result;
    }

    @VisibleForTesting
    ProcessHolder runTaskProcess(List<String> list, File file, TaskLocation taskLocation) throws IOException {
        return new ProcessHolder(new ProcessBuilder(ImmutableList.copyOf((Collection) list)).redirectErrorStream(true).start(), file, taskLocation.getHost(), taskLocation.getPort(), taskLocation.getTlsPort());
    }

    @VisibleForTesting
    int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File file, File file2) throws IOException, InterruptedException {
        ByteSink asByteSink = Files.asByteSink(file, FileWriteMode.APPEND);
        String name = Thread.currentThread().getName();
        Thread.currentThread().setName(StringUtils.format("%s-[%s]", name, task.getId()));
        try {
            OutputStream openStream = asByteSink.openStream();
            Throwable th = null;
            try {
                try {
                    ByteStreams.copy(processHolder.process.getInputStream(), openStream);
                    int waitFor = processHolder.process.waitFor();
                    if (openStream != null) {
                        if (0 != 0) {
                            try {
                                openStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openStream.close();
                        }
                    }
                    Thread.currentThread().setName(name);
                    this.taskLogPusher.pushTaskLog(task.getId(), file);
                    if (file2.exists()) {
                        this.taskLogPusher.pushTaskReports(task.getId(), file2);
                    }
                    return waitFor;
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            Thread.currentThread().setName(name);
            this.taskLogPusher.pushTaskLog(task.getId(), file);
            if (file2.exists()) {
                this.taskLogPusher.pushTaskReports(task.getId(), file2);
            }
            throw th3;
        }
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    @LifecycleStop
    public void stop() {
        ImmutableSet copyOf;
        this.stopping = true;
        this.exec.shutdown();
        synchronized (this.tasks) {
            Iterator it2 = this.tasks.values().iterator();
            while (it2.hasNext()) {
                shutdownTaskProcess((ForkingTaskRunnerWorkItem) it2.next());
            }
        }
        DateTime nowUtc = DateTimes.nowUtc();
        long durationMillis = new Interval(nowUtc, this.taskConfig.getGracefulShutdownTimeout()).toDurationMillis();
        LOGGER.info("Waiting up to %,dms for shutdown.", Long.valueOf(durationMillis));
        if (durationMillis <= 0) {
            LOGGER.warn("Ran out of time, not waiting for executor to finish!", new Object[0]);
            return;
        }
        try {
            boolean awaitTermination = this.exec.awaitTermination(durationMillis, TimeUnit.MILLISECONDS);
            long currentTimeMillis = System.currentTimeMillis() - nowUtc.getMillis();
            if (awaitTermination) {
                LOGGER.info("Finished stopping in %,dms.", Long.valueOf(currentTimeMillis));
            } else {
                synchronized (this.tasks) {
                    copyOf = ImmutableSet.copyOf((Collection) this.tasks.keySet());
                }
                LOGGER.makeAlert("Failed to stop forked tasks", new Object[0]).addData("stillRunning", copyOf).addData("elapsed", Long.valueOf(currentTimeMillis)).emit();
                LOGGER.warn("Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]", Long.valueOf(currentTimeMillis), Joiner.on(VectorFormat.DEFAULT_SEPARATOR).join(copyOf));
            }
        } catch (InterruptedException e) {
            LOGGER.warn(e, "Interrupted while waiting for executor to finish.", new Object[0]);
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public void shutdown(String str, String str2) {
        LOGGER.info("Shutdown [%s] because: [%s]", str, str2);
        synchronized (this.tasks) {
            ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem = (ForkingTaskRunnerWorkItem) this.tasks.get(str);
            if (forkingTaskRunnerWorkItem == null) {
                LOGGER.info("Ignoring request to cancel unknown task: %s", str);
            } else {
                forkingTaskRunnerWorkItem.shutdown = true;
                shutdownTaskProcess(forkingTaskRunnerWorkItem);
            }
        }
    }

    @Override // org.apache.druid.indexing.overlord.BaseRestorableTaskRunner, org.apache.druid.indexing.overlord.TaskRunner
    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        ArrayList arrayList;
        synchronized (this.tasks) {
            arrayList = new ArrayList();
            for (ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem : this.tasks.values()) {
                if (forkingTaskRunnerWorkItem.processHolder != null) {
                    arrayList.add(forkingTaskRunnerWorkItem);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.druid.indexing.overlord.BaseRestorableTaskRunner, org.apache.druid.indexing.overlord.TaskRunner
    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        ArrayList arrayList;
        synchronized (this.tasks) {
            arrayList = new ArrayList();
            for (ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem : this.tasks.values()) {
                if (forkingTaskRunnerWorkItem.processHolder == null) {
                    arrayList.add(forkingTaskRunnerWorkItem);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.druid.indexing.overlord.BaseRestorableTaskRunner, org.apache.druid.indexing.overlord.TaskRunner
    @Nullable
    public RunnerTaskState getRunnerTaskState(String str) {
        ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem = (ForkingTaskRunnerWorkItem) this.tasks.get(str);
        if (forkingTaskRunnerWorkItem == null) {
            return null;
        }
        return forkingTaskRunnerWorkItem.processHolder == null ? RunnerTaskState.PENDING : forkingTaskRunnerWorkItem.processHolder.process.isAlive() ? RunnerTaskState.RUNNING : RunnerTaskState.NONE;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public void start() {
    }

    @Override // org.apache.druid.tasklogs.TaskLogStreamer
    public Optional<ByteSource> streamTaskLog(String str, final long j) {
        synchronized (this.tasks) {
            ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem = (ForkingTaskRunnerWorkItem) this.tasks.get(str);
            if (forkingTaskRunnerWorkItem == null || forkingTaskRunnerWorkItem.processHolder == null) {
                return Optional.absent();
            }
            final ProcessHolder processHolder = forkingTaskRunnerWorkItem.processHolder;
            return Optional.of(new ByteSource() { // from class: org.apache.druid.indexing.overlord.ForkingTaskRunner.2
                @Override // com.google.common.io.ByteSource
                public InputStream openStream() throws IOException {
                    return LogUtils.streamFile(processHolder.logFile, j);
                }
            });
        }
    }

    private void shutdownTaskProcess(ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem) {
        if (forkingTaskRunnerWorkItem.processHolder != null) {
            LOGGER.info("Closing output stream to task[%s].", forkingTaskRunnerWorkItem.getTask().getId());
            try {
                forkingTaskRunnerWorkItem.processHolder.process.getOutputStream().close();
            } catch (Exception e) {
                LOGGER.warn(e, "Failed to close stdout to task[%s]. Destroying task.", forkingTaskRunnerWorkItem.getTask().getId());
                forkingTaskRunnerWorkItem.processHolder.process.destroy();
            }
        }
    }

    String getMaskedCommand(List<String> list, List<String> list2) {
        HashSet newHashSet = Sets.newHashSet(list);
        return Joiner.on(" ").join(list2.stream().map(str -> {
            String[] split = str.split("=", 2);
            if (split.length == 2) {
                Iterator it2 = newHashSet.iterator();
                while (it2.hasNext()) {
                    if (split[0].contains((String) it2.next())) {
                        return StringUtils.format("%s=%s", split[0], "<masked>");
                    }
                }
            }
            return str;
        }).iterator());
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getTotalTaskSlotCount() {
        return (this.config.getPorts() == null || this.config.getPorts().isEmpty()) ? (this.config.getEndPort() - this.config.getStartPort()) + 1 : this.config.getPorts().size();
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getIdleTaskSlotCount() {
        return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0L);
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getUsedTaskSlotCount() {
        return this.portFinder.findUsedPortCount();
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getLazyTaskSlotCount() {
        return 0L;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getBlacklistedTaskSlotCount() {
        return 0L;
    }
}
