package org.apache.druid.k8s.overlord;

import com.google.api.client.util.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;

/* loaded from: input_file:org/apache/druid/k8s/overlord/KubernetesTaskRunner.class */
public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner {
    private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
    protected final TaskAdapter adapter;
    private final KubernetesPeonClient client;
    private final KubernetesTaskRunnerConfig config;
    private final ListeningExecutorService exec;
    private final HttpClient httpClient;
    private final PeonLifecycleFactory peonLifecycleFactory;
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
    protected final ConcurrentHashMap<String, KubernetesWorkItem> tasks = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleanupExecutor = Executors.newScheduledThreadPool(1);

    public KubernetesTaskRunner(TaskAdapter taskAdapter, KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, KubernetesPeonClient kubernetesPeonClient, HttpClient httpClient, PeonLifecycleFactory peonLifecycleFactory) {
        this.adapter = taskAdapter;
        this.config = kubernetesTaskRunnerConfig;
        this.client = kubernetesPeonClient;
        this.httpClient = httpClient;
        this.peonLifecycleFactory = peonLifecycleFactory;
        this.exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(kubernetesTaskRunnerConfig.getCapacity().intValue(), "k8s-task-runner-%d"));
    }

    public Optional<InputStream> streamTaskLog(String str, long j) {
        KubernetesWorkItem kubernetesWorkItem = this.tasks.get(str);
        return kubernetesWorkItem == null ? Optional.absent() : kubernetesWorkItem.streamTaskLogs();
    }

    public ListenableFuture<TaskStatus> run(Task task) {
        return this.tasks.computeIfAbsent(task.getId(), str -> {
            return new KubernetesWorkItem(task, this.exec.submit(() -> {
                return runTask(task);
            }));
        }).getResult();
    }

    protected ListenableFuture<TaskStatus> joinAsync(Task task) {
        return this.tasks.computeIfAbsent(task.getId(), str -> {
            return new KubernetesWorkItem(task, this.exec.submit(() -> {
                return joinTask(task);
            }));
        }).getResult();
    }

    private TaskStatus runTask(Task task) {
        return doTask(task, true);
    }

    private TaskStatus joinTask(Task task) {
        return doTask(task, false);
    }

    @VisibleForTesting
    protected TaskStatus doTask(Task task, boolean z) {
        KubernetesPeonLifecycle build = this.peonLifecycleFactory.build(task);
        KubernetesWorkItem kubernetesWorkItem = this.tasks.get(task.getId());
        if (kubernetesWorkItem == null) {
            throw new ISE("Task [%s] disappeared", new Object[]{task.getId()});
        }
        if (kubernetesWorkItem.isShutdownRequested()) {
            throw new ISE("Task [%s] has been shut down", new Object[]{task.getId()});
        }
        kubernetesWorkItem.setKubernetesPeonLifecycle(build);
        try {
            try {
                TaskStatus run = z ? build.run(this.adapter.fromTask(task), this.config.getTaskLaunchTimeout().toStandardDuration().getMillis(), this.config.getTaskTimeout().toStandardDuration().getMillis()) : build.join(this.config.getTaskTimeout().toStandardDuration().getMillis());
                updateStatus(task, run);
                TaskStatus taskStatus = run;
                this.tasks.remove(task.getId());
                return taskStatus;
            } catch (Exception e) {
                log.error(e, "Task [%s] execution caught an exception", new Object[]{task.getId()});
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.tasks.remove(task.getId());
            throw th;
        }
    }

    public void updateStatus(Task task, TaskStatus taskStatus) {
        TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), taskStatus);
    }

    public void updateLocation(Task task, TaskLocation taskLocation) {
        TaskRunnerUtils.notifyLocationChanged(this.listeners, task.getId(), taskLocation);
    }

    public void shutdown(String str, String str2) {
        log.info("Shutdown [%s] because [%s]", new Object[]{str, str2});
        KubernetesWorkItem kubernetesWorkItem = this.tasks.get(str);
        if (kubernetesWorkItem == null) {
            log.info("Ignoring request to cancel unknown task [%s]", new Object[]{str});
        } else {
            kubernetesWorkItem.shutdown();
        }
    }

    public Optional<InputStream> streamTaskReports(String str) throws IOException {
        KubernetesWorkItem kubernetesWorkItem = this.tasks.get(str);
        if (kubernetesWorkItem == null) {
            return Optional.absent();
        }
        TaskLocation location = kubernetesWorkItem.getLocation();
        if (TaskLocation.unknown().equals(location)) {
            return Optional.absent();
        }
        try {
            return Optional.of(this.httpClient.go(new Request(HttpMethod.GET, TaskRunnerUtils.makeTaskLocationURL(location, "/druid/worker/v1/chat/%s/liveReports", new String[]{str})), new InputStreamResponseHandler()).get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            Throwables.propagateIfPossible(e2.getCause(), IOException.class);
            throw new RuntimeException(e2);
        }
    }

    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        ArrayList arrayList = new ArrayList();
        for (Job job : this.client.getPeonJobs()) {
            try {
                Task task = this.adapter.toTask(job);
                arrayList.add(Pair.of(task, joinAsync(task)));
            } catch (IOException e) {
                log.error(e, "Error deserializing task from job [%s]", new Object[]{job.getMetadata().getName()});
            }
        }
        return arrayList;
    }

    public void start() {
        this.cleanupExecutor.scheduleAtFixedRate(() -> {
            this.client.deleteCompletedPeonJobsOlderThan(this.config.getTaskCleanupDelay().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS);
        }, 1L, this.config.getTaskCleanupInterval().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS);
        log.debug("Started cleanup executor for jobs older than %s...", new Object[]{this.config.getTaskCleanupDelay()});
    }

    public void stop() {
        log.debug("Stopping KubernetesTaskRunner", new Object[0]);
        this.cleanupExecutor.shutdownNow();
        log.debug("Stopped KubernetesTaskRunner", new Object[0]);
    }

    public Map<String, Long> getTotalTaskSlotCount() {
        return ImmutableMap.of("taskQueue", Long.valueOf(this.config.getCapacity().intValue()));
    }

    public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
        return Lists.newArrayList(this.tasks.values());
    }

    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    public Map<String, Long> getIdleTaskSlotCount() {
        return Collections.emptyMap();
    }

    public Map<String, Long> getUsedTaskSlotCount() {
        return Collections.emptyMap();
    }

    public Map<String, Long> getLazyTaskSlotCount() {
        return Collections.emptyMap();
    }

    public Map<String, Long> getBlacklistedTaskSlotCount() {
        return Collections.emptyMap();
    }

    public boolean isK8sTaskRunner() {
        return true;
    }

    public void unregisterListener(String str) {
        Iterator<Pair<TaskRunnerListener, Executor>> it = this.listeners.iterator();
        while (it.hasNext()) {
            Pair<TaskRunnerListener, Executor> next = it.next();
            if (next.lhs != null && ((TaskRunnerListener) next.lhs).getListenerId().equals(str)) {
                this.listeners.remove(next);
                log.debug("Unregistered listener [%s]", new Object[]{str});
                return;
            }
        }
    }

    public void registerListener(TaskRunnerListener taskRunnerListener, Executor executor) {
        Iterator<Pair<TaskRunnerListener, Executor>> it = this.listeners.iterator();
        while (it.hasNext()) {
            Pair<TaskRunnerListener, Executor> next = it.next();
            if (next.lhs != null && ((TaskRunnerListener) next.lhs).getListenerId().equals(taskRunnerListener.getListenerId())) {
                throw new ISE("Listener [%s] already registered", new Object[]{taskRunnerListener.getListenerId()});
            }
        }
        Pair<TaskRunnerListener, Executor> of = Pair.of(taskRunnerListener, executor);
        log.debug("Registered listener [%s]", new Object[]{taskRunnerListener.getListenerId()});
        this.listeners.add(of);
    }

    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        return (Collection) this.tasks.values().stream().filter((v0) -> {
            return v0.isRunning();
        }).collect(Collectors.toList());
    }

    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        return (Collection) this.tasks.values().stream().filter((v0) -> {
            return v0.isPending();
        }).collect(Collectors.toList());
    }

    @Nullable
    public RunnerTaskState getRunnerTaskState(String str) {
        KubernetesWorkItem kubernetesWorkItem = this.tasks.get(str);
        if (kubernetesWorkItem == null) {
            return null;
        }
        return kubernetesWorkItem.getRunnerTaskState();
    }
}
