package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

/* loaded from: input_file:org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.class */
public class KubernetesPeonLifecycle {
    private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
    private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
    private final K8sTaskId taskId;
    private final TaskLogs taskLogs;
    private final KubernetesPeonClient kubernetesClient;
    private final ObjectMapper mapper;
    private LogWatch logWatch;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/druid/k8s/overlord/KubernetesPeonLifecycle$State.class */
    public enum State {
        NOT_STARTED,
        PENDING,
        RUNNING,
        STOPPED
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KubernetesPeonLifecycle(Task task, KubernetesPeonClient kubernetesPeonClient, TaskLogs taskLogs, ObjectMapper objectMapper) {
        this.taskId = new K8sTaskId(task);
        this.kubernetesClient = kubernetesPeonClient;
        this.taskLogs = taskLogs;
        this.mapper = objectMapper;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized TaskStatus run(Job job, long j, long j2) throws IllegalStateException {
        try {
            try {
                Preconditions.checkState(this.state.compareAndSet(State.NOT_STARTED, State.PENDING), "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]", new Object[]{this.taskId.getOriginalTaskId(), this.state.get(), State.PENDING});
                this.kubernetesClient.launchPeonJobAndWaitForStart(job, j, TimeUnit.MILLISECONDS);
                TaskStatus join = join(j2);
                this.state.set(State.STOPPED);
                return join;
            } catch (Exception e) {
                log.info("Failed to run task: %s", new Object[]{this.taskId.getOriginalTaskId()});
                shutdown();
                throw e;
            }
        } catch (Throwable th) {
            this.state.set(State.STOPPED);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized TaskStatus join(long j) throws IllegalStateException {
        try {
            Preconditions.checkState(this.state.compareAndSet(State.NOT_STARTED, State.RUNNING) || this.state.compareAndSet(State.PENDING, State.RUNNING), "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]", new Object[]{this.taskId.getOriginalTaskId(), this.state.get(), State.RUNNING});
            TaskStatus taskStatus = getTaskStatus(this.kubernetesClient.waitForPeonJobCompletion(this.taskId, j, TimeUnit.MILLISECONDS).getJobDuration());
            try {
                saveLogs();
                shutdown();
            } catch (Exception e) {
                log.warn(e, "Task [%s] cleanup failed", new Object[]{this.taskId});
            }
            this.state.set(State.STOPPED);
            return taskStatus;
        } catch (Throwable th) {
            try {
                saveLogs();
                shutdown();
            } catch (Exception e2) {
                log.warn(e2, "Task [%s] cleanup failed", new Object[]{this.taskId});
            }
            this.state.set(State.STOPPED);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdown() {
        if (State.PENDING.equals(this.state.get()) || State.RUNNING.equals(this.state.get())) {
            this.kubernetesClient.deletePeonJob(this.taskId);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<InputStream> streamLogs() {
        return !State.RUNNING.equals(this.state.get()) ? Optional.absent() : this.kubernetesClient.getPeonLogs(this.taskId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public State getState() {
        return this.state.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TaskLocation getTaskLocation() {
        if (!State.RUNNING.equals(this.state.get())) {
            log.debug("Can't get task location for non-running job. [%s]", new Object[]{this.taskId.getOriginalTaskId()});
            return TaskLocation.unknown();
        }
        Optional<Pod> peonPod = this.kubernetesClient.getPeonPod(this.taskId);
        if (!peonPod.isPresent()) {
            return TaskLocation.unknown();
        }
        Pod pod = (Pod) peonPod.get();
        PodStatus status = pod.getStatus();
        return (status == null || status.getPodIP() == null) ? TaskLocation.unknown() : TaskLocation.create(status.getPodIP(), DruidK8sConstants.PORT, DruidK8sConstants.TLS_PORT, Boolean.parseBoolean((String) pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")));
    }

    private TaskStatus getTaskStatus(long j) {
        TaskStatus failure;
        try {
            Optional streamTaskStatus = this.taskLogs.streamTaskStatus(this.taskId.getOriginalTaskId());
            failure = streamTaskStatus.isPresent() ? (TaskStatus) this.mapper.readValue(IOUtils.toString((InputStream) streamTaskStatus.get(), StandardCharsets.UTF_8), TaskStatus.class) : TaskStatus.failure(this.taskId.getOriginalTaskId(), "task status not found");
        } catch (IOException e) {
            log.error(e, "Failed to load task status for task [%s]", new Object[]{this.taskId.getOriginalTaskId()});
            failure = TaskStatus.failure(this.taskId.getOriginalTaskId(), StringUtils.format("error loading status: %s", new Object[]{e.getMessage()}));
        }
        return failure.withDuration(j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startWatchingLogs() {
        if (this.logWatch != null) {
            log.debug("There is already a log watcher for %s", new Object[]{this.taskId.getOriginalTaskId()});
            return;
        }
        try {
            Optional<LogWatch> peonLogWatcher = this.kubernetesClient.getPeonLogWatcher(this.taskId);
            if (peonLogWatcher.isPresent()) {
                this.logWatch = (LogWatch) peonLogWatcher.get();
            }
        } catch (Exception e) {
            log.error(e, "Error watching logs from task: %s", new Object[]{this.taskId});
        }
    }

    /* JADX WARN: Finally extract failed */
    protected void saveLogs() {
        try {
            Path createTempFile = Files.createTempFile(this.taskId.getOriginalTaskId(), "log", new FileAttribute[0]);
            try {
                try {
                    startWatchingLogs();
                    if (this.logWatch != null) {
                        FileUtils.copyInputStreamToFile(this.logWatch.getOutput(), createTempFile.toFile());
                    } else {
                        log.debug("Log stream not found for %s", new Object[]{this.taskId.getOriginalTaskId()});
                    }
                    this.taskLogs.pushTaskLog(this.taskId.getOriginalTaskId(), createTempFile.toFile());
                    if (this.logWatch != null) {
                        this.logWatch.close();
                    }
                    Files.deleteIfExists(createTempFile);
                } catch (IOException e) {
                    log.error(e, "Failed to stream logs for task [%s]", new Object[]{this.taskId.getOriginalTaskId()});
                    if (this.logWatch != null) {
                        this.logWatch.close();
                    }
                    Files.deleteIfExists(createTempFile);
                }
            } catch (Throwable th) {
                if (this.logWatch != null) {
                    this.logWatch.close();
                }
                Files.deleteIfExists(createTempFile);
                throw th;
            }
        } catch (IOException e2) {
            log.warn(e2, "Failed to manage temporary log file for task [%s]", new Object[]{this.taskId.getOriginalTaskId()});
        }
    }
}
