package org.apache.druid.k8s.overlord;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.netty.util.SuppressForbidden;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/k8s/overlord/KubernetesTaskRunner.class */
public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner {
    private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
    private final TaskConfig taskConfig;
    private final StartupLoggingConfig startupLoggingConfig;
    private final TaskAdapter<Pod, Job> adapter;
    private final KubernetesTaskRunnerConfig k8sConfig;
    private final TaskQueueConfig taskQueueConfig;
    private final TaskLogPusher taskLogPusher;
    private final ListeningExecutorService exec;
    private final KubernetesPeonClient client;
    private final DruidNode node;
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
    protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleanupExecutor = Executors.newScheduledThreadPool(1);

    /* loaded from: input_file:org/apache/druid/k8s/overlord/KubernetesTaskRunner$K8sWorkItem.class */
    public static class K8sWorkItem extends TaskRunnerWorkItem {
        private final Task task;
        private KubernetesPeonClient client;

        public K8sWorkItem(KubernetesPeonClient kubernetesPeonClient, Task task, ListenableFuture<TaskStatus> listenableFuture) {
            super(task.getId(), listenableFuture);
            this.task = task;
            this.client = kubernetesPeonClient;
        }

        public K8sWorkItem(KubernetesPeonClient kubernetesPeonClient, Task task, ListenableFuture<TaskStatus> listenableFuture, DateTime dateTime) {
            super(task.getId(), listenableFuture, dateTime, dateTime);
            this.task = task;
            this.client = kubernetesPeonClient;
        }

        public TaskLocation getLocation() {
            K8sTaskId k8sTaskId = new K8sTaskId(this.task.getId());
            try {
                Pod mainJobPod = this.client.getMainJobPod(new K8sTaskId(this.task.getId()));
                return (mainJobPod.getStatus() == null || mainJobPod.getStatus().getPodIP() == null) ? TaskLocation.unknown() : TaskLocation.create(mainJobPod.getStatus().getPodIP(), DruidK8sConstants.PORT, DruidK8sConstants.TLS_PORT, Boolean.parseBoolean((String) mainJobPod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")));
            } catch (Exception e) {
                KubernetesTaskRunner.log.error(e, "Error getting task location for task %s", new Object[]{k8sTaskId});
                return TaskLocation.unknown();
            }
        }

        public String getTaskType() {
            return this.task.getType();
        }

        public String getDataSource() {
            return this.task.getDataSource();
        }
    }

    public KubernetesTaskRunner(TaskConfig taskConfig, StartupLoggingConfig startupLoggingConfig, TaskAdapter<Pod, Job> taskAdapter, KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, TaskQueueConfig taskQueueConfig, TaskLogPusher taskLogPusher, KubernetesPeonClient kubernetesPeonClient, DruidNode druidNode) {
        this.taskConfig = taskConfig;
        this.startupLoggingConfig = startupLoggingConfig;
        this.adapter = taskAdapter;
        this.k8sConfig = kubernetesTaskRunnerConfig;
        this.taskQueueConfig = taskQueueConfig;
        this.taskLogPusher = taskLogPusher;
        this.client = kubernetesPeonClient;
        this.node = druidNode;
        this.exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d"));
        Preconditions.checkArgument(taskQueueConfig.getMaxSize() < Integer.MAX_VALUE, "The task queue bounds how many concurrent k8s tasks you can have");
    }

    public Optional<InputStream> streamTaskLog(String str, long j) {
        return this.client.getPeonLogs(new K8sTaskId(str));
    }

    public ListenableFuture<TaskStatus> run(Task task) {
        ListenableFuture<TaskStatus> result;
        synchronized (this.tasks) {
            this.tasks.computeIfAbsent(task.getId(), str -> {
                return new K8sWorkItem(this.client, task, this.exec.submit(() -> {
                    Path createTempFile;
                    JobResponse jobResponse;
                    K8sTaskId k8sTaskId = new K8sTaskId(task);
                    try {
                        try {
                            Optional<Job> jobExists = this.client.jobExists(k8sTaskId);
                            if (jobExists.isPresent()) {
                                Job job = (Job) jobExists.get();
                                jobResponse = job.getStatus().getActive() == null ? job.getStatus().getSucceeded() != null ? new JobResponse(job, PeonPhase.SUCCEEDED) : new JobResponse(job, PeonPhase.FAILED) : monitorJob(k8sTaskId);
                            } else {
                                Job fromTask = this.adapter.fromTask(task, new PeonCommandContext(generateCommand(task), javaOpts(task), this.taskConfig.getTaskDir(task.getId()), this.node.isEnableTlsPort()));
                                log.info("Job created %s and ready to launch", new Object[]{k8sTaskId});
                                Pod launchJobAndWaitForStart = this.client.launchJobAndWaitForStart(fromTask, KubernetesTaskRunnerConfig.toMilliseconds(this.k8sConfig.k8sjobLaunchTimeout), TimeUnit.MILLISECONDS);
                                log.info("Job %s launched in k8s", new Object[]{k8sTaskId});
                                jobResponse = monitorJob(launchJobAndWaitForStart, k8sTaskId);
                            }
                            TaskStatus success = PeonPhase.SUCCEEDED.equals(jobResponse.getPhase()) ? TaskStatus.success(task.getId()) : TaskStatus.failure(task.getId(), "Task failed %s: " + k8sTaskId);
                            if (jobResponse.getJobDuration().isPresent()) {
                                success = success.withDuration(((Long) jobResponse.getJobDuration().get()).longValue());
                            }
                            updateStatus(task, success);
                            TaskStatus taskStatus = success;
                            createTempFile = Files.createTempFile(task.getId(), "log", new FileAttribute[0]);
                            try {
                                FileUtils.write(createTempFile.toFile(), this.client.getJobLogs(new K8sTaskId(task.getId())), StandardCharsets.UTF_8);
                                this.taskLogPusher.pushTaskLog(task.getId(), createTempFile.toFile());
                                Files.deleteIfExists(createTempFile);
                                this.client.cleanUpJob(new K8sTaskId(task.getId()));
                                synchronized (this.tasks) {
                                    this.tasks.remove(task.getId());
                                }
                                return taskStatus;
                            } finally {
                            }
                        } catch (Exception e) {
                            log.error(e, "Error with task: %s", new Object[]{k8sTaskId});
                            throw e;
                        }
                    } catch (Throwable th) {
                        createTempFile = Files.createTempFile(task.getId(), "log", new FileAttribute[0]);
                        try {
                            FileUtils.write(createTempFile.toFile(), this.client.getJobLogs(new K8sTaskId(task.getId())), StandardCharsets.UTF_8);
                            this.taskLogPusher.pushTaskLog(task.getId(), createTempFile.toFile());
                            Files.deleteIfExists(createTempFile);
                            this.client.cleanUpJob(new K8sTaskId(task.getId()));
                            synchronized (this.tasks) {
                                this.tasks.remove(task.getId());
                                throw th;
                            }
                        } finally {
                        }
                    }
                }));
            });
            result = this.tasks.get(task.getId()).getResult();
        }
        return result;
    }

    JobResponse monitorJob(K8sTaskId k8sTaskId) {
        return monitorJob(this.client.getMainJobPod(k8sTaskId), k8sTaskId);
    }

    JobResponse monitorJob(Pod pod, K8sTaskId k8sTaskId) {
        if (pod == null) {
            throw new ISE("Error in k8s launching peon pod for task %s", new Object[]{k8sTaskId});
        }
        return this.client.waitForJobCompletion(k8sTaskId, KubernetesTaskRunnerConfig.toMilliseconds(this.k8sConfig.maxTaskDuration), TimeUnit.MILLISECONDS);
    }

    public void updateStatus(Task task, TaskStatus taskStatus) {
        TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), taskStatus);
    }

    public void updateLocation(Task task, TaskLocation taskLocation) {
        TaskRunnerUtils.notifyLocationChanged(this.listeners, task.getId(), taskLocation);
    }

    public void shutdown(String str, String str2) {
        this.client.cleanUpJob(new K8sTaskId(str));
    }

    public Optional<InputStream> streamTaskReports(String str) {
        return Optional.absent();
    }

    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        return ImmutableList.of();
    }

    public void start() {
        this.cleanupExecutor.scheduleAtFixedRate(() -> {
            this.client.cleanCompletedJobsOlderThan(KubernetesTaskRunnerConfig.toMilliseconds(this.k8sConfig.taskCleanupDelay), TimeUnit.MILLISECONDS);
        }, 1L, KubernetesTaskRunnerConfig.toMilliseconds(this.k8sConfig.taskCleanupInterval), TimeUnit.MILLISECONDS);
        log.debug("Started cleanup executor for jobs older than %s....", new Object[]{this.k8sConfig.taskCleanupDelay});
    }

    public void stop() {
        log.info("Stopping KubernetesTaskRunner", new Object[0]);
        this.cleanupExecutor.shutdownNow();
        log.info("Stopped KubernetesTaskRunner", new Object[0]);
    }

    public Map<String, Long> getTotalTaskSlotCount() {
        return ImmutableMap.of("taskQueue", Long.valueOf(this.taskQueueConfig.getMaxSize()));
    }

    private List<String> javaOpts(Task task) {
        ArrayList arrayList = new ArrayList();
        Iterables.addAll(arrayList, this.k8sConfig.javaOptsArray);
        Object contextValue = task.getContextValue("druid.indexer.runner.javaOpts");
        if (contextValue != null) {
            Iterables.addAll(arrayList, new QuotableWhiteSpaceSplitter((String) contextValue));
        }
        arrayList.add(StringUtils.format("-Ddruid.port=%d", new Object[]{Integer.valueOf(DruidK8sConstants.PORT)}));
        arrayList.add(StringUtils.format("-Ddruid.plaintextPort=%d", new Object[]{Integer.valueOf(DruidK8sConstants.PORT)}));
        Object[] objArr = new Object[1];
        objArr[0] = Integer.valueOf(this.node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1);
        arrayList.add(StringUtils.format("-Ddruid.tlsPort=%d", objArr));
        Object[] objArr2 = new Object[1];
        objArr2[0] = Integer.valueOf(this.node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1);
        arrayList.add(StringUtils.format("-Ddruid.task.executor.tlsPort=%d", objArr2));
        arrayList.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", new Object[]{Boolean.valueOf(this.node.isEnableTlsPort())}));
        return arrayList;
    }

    private List<String> generateCommand(Task task) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("/peon.sh");
        arrayList.add(this.taskConfig.getTaskDir(task.getId()).toString());
        arrayList.add("1");
        String nodeType = task.getNodeType();
        if (nodeType != null) {
            arrayList.add("--nodeType");
            arrayList.add(nodeType);
        }
        if (task.supportsQueries()) {
            arrayList.add("--loadBroadcastSegments");
            arrayList.add("true");
        }
        log.info("Peon Command for K8s job: %s", new Object[]{ForkingTaskRunner.getMaskedCommand(this.startupLoggingConfig.getMaskProperties(), arrayList)});
        return arrayList;
    }

    public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
        ArrayList arrayList = new ArrayList();
        for (Pod pod : this.client.listPeonPods()) {
            try {
                Task task = this.adapter.toTask(pod);
                arrayList.add(new K8sWorkItem(this.client, task, run(task), DateTimes.of(pod.getMetadata().getCreationTimestamp())));
            } catch (IOException e) {
                log.error("Error deserializing task from pod: " + pod.getMetadata().getName(), new Object[0]);
            }
        }
        return arrayList;
    }

    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    public Map<String, Long> getIdleTaskSlotCount() {
        return Collections.emptyMap();
    }

    public Map<String, Long> getUsedTaskSlotCount() {
        return Collections.emptyMap();
    }

    public Map<String, Long> getLazyTaskSlotCount() {
        return Collections.emptyMap();
    }

    public Map<String, Long> getBlacklistedTaskSlotCount() {
        return Collections.emptyMap();
    }

    public boolean isK8sTaskRunner() {
        return true;
    }

    public void unregisterListener(String str) {
        Iterator<Pair<TaskRunnerListener, Executor>> it = this.listeners.iterator();
        while (it.hasNext()) {
            Pair<TaskRunnerListener, Executor> next = it.next();
            if (next.lhs != null && ((TaskRunnerListener) next.lhs).getListenerId().equals(str)) {
                this.listeners.remove(next);
                log.debug("Unregistered listener [%s]", new Object[]{str});
                return;
            }
        }
    }

    public void registerListener(TaskRunnerListener taskRunnerListener, Executor executor) {
        Iterator<Pair<TaskRunnerListener, Executor>> it = this.listeners.iterator();
        while (it.hasNext()) {
            Pair<TaskRunnerListener, Executor> next = it.next();
            if (next.lhs != null && ((TaskRunnerListener) next.lhs).getListenerId().equals(taskRunnerListener.getListenerId())) {
                throw new ISE("Listener [%s] already registered", new Object[]{taskRunnerListener.getListenerId()});
            }
        }
        Pair<TaskRunnerListener, Executor> of = Pair.of(taskRunnerListener, executor);
        log.debug("Registered listener [%s]", new Object[]{taskRunnerListener.getListenerId()});
        this.listeners.add(of);
    }

    @SuppressForbidden(reason = "Sets#newHashSet")
    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        ArrayList arrayList = new ArrayList();
        for (Pod pod : this.client.listPeonPods(Sets.newHashSet(new PeonPhase[]{PeonPhase.RUNNING}))) {
            try {
                Task task = this.adapter.toTask(pod);
                arrayList.add(new K8sWorkItem(this.client, task, run(task), DateTime.parse(pod.getMetadata().getCreationTimestamp())));
            } catch (IOException e) {
                log.error("Error deserializing task from pod: " + pod.getMetadata().getName(), new Object[0]);
            }
        }
        return arrayList;
    }

    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        return new ArrayList();
    }

    @Nullable
    public RunnerTaskState getRunnerTaskState(String str) {
        if (this.client.getMainJobPod(new K8sTaskId(str)) == null) {
            return null;
        }
        switch (PeonPhase.getPhaseFor(r0)) {
            case PENDING:
                return RunnerTaskState.PENDING;
            case RUNNING:
                return RunnerTaskState.RUNNING;
            default:
                return RunnerTaskState.WAITING;
        }
    }
}
