package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.PodTemplateSpecFluent;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobFluent;
import io.fabric8.kubernetes.api.model.batch.v1.JobSpecFluent;
import io.fabric8.kubernetes.client.utils.Serialization;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogs;

/* loaded from: input_file:org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.class */
public class PodTemplateTaskAdapter implements TaskAdapter {
    public static final String TYPE = "customTemplateAdapter";
    private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
    private static final String TASK_PROPERTY = "druid.indexer.runner.k8s.podTemplate.%s";
    private final KubernetesTaskRunnerConfig taskRunnerConfig;
    private final TaskConfig taskConfig;
    private final DruidNode node;
    private final ObjectMapper mapper;
    private final HashMap<String, PodTemplate> templates;
    private final TaskLogs taskLogs;

    public PodTemplateTaskAdapter(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, TaskConfig taskConfig, DruidNode druidNode, ObjectMapper objectMapper, Properties properties, TaskLogs taskLogs) {
        this.taskRunnerConfig = kubernetesTaskRunnerConfig;
        this.taskConfig = taskConfig;
        this.node = druidNode;
        this.mapper = objectMapper;
        this.templates = initializePodTemplates(properties);
        this.taskLogs = taskLogs;
    }

    @Override // org.apache.druid.k8s.overlord.taskadapter.TaskAdapter
    public Job fromTask(Task task) throws IOException {
        PodTemplate orDefault = this.templates.getOrDefault(task.getType(), this.templates.get("base"));
        if (orDefault == null) {
            throw new ISE("Pod template spec not found for task type [%s]", new Object[]{task.getType()});
        }
        return ((JobBuilder) ((JobFluent.SpecNested) ((JobSpecFluent.TemplateNested) ((PodTemplateSpecFluent.SpecNested) ((JobSpecFluent.TemplateNested) ((JobBuilder) new JobBuilder().withNewMetadata().withName(new K8sTaskId(task).getK8sJobName()).addToLabels(getJobLabels(this.taskRunnerConfig, task)).addToAnnotations(getJobAnnotations(this.taskRunnerConfig, task)).endMetadata()).withNewSpec().withTemplate(orDefault.getTemplate()).editTemplate().editOrNewMetadata().addToAnnotations(getPodTemplateAnnotations(task)).addToLabels(getPodLabels(this.taskRunnerConfig, task)).endMetadata()).editSpec().editFirstContainer().addAllToEnv(getEnv(task)).endContainer()).endSpec()).endTemplate()).withActiveDeadlineSeconds(Long.valueOf(this.taskRunnerConfig.getTaskTimeout().toStandardDuration().getStandardSeconds())).withBackoffLimit(0).withTtlSecondsAfterFinished(Integer.valueOf((int) this.taskRunnerConfig.getTaskCleanupDelay().toStandardDuration().getStandardSeconds())).endSpec()).build();
    }

    @Override // org.apache.druid.k8s.overlord.taskadapter.TaskAdapter
    public Task toTask(Job job) throws IOException {
        Map annotations = job.getSpec().getTemplate().getMetadata().getAnnotations();
        if (annotations == null) {
            log.info("No annotations found on pod spec for job [%s]. Trying to load task payload from deep storage.", new Object[]{job.getMetadata().getName()});
            return toTaskUsingDeepStorage(job);
        }
        String str = (String) annotations.get(DruidK8sConstants.TASK);
        if (str != null) {
            return (Task) this.mapper.readValue(Base64Compression.decompressBase64(str), Task.class);
        }
        log.info("No task annotation found on pod spec for job [%s]. Trying to load task payload from deep storage.", new Object[]{job.getMetadata().getName()});
        return toTaskUsingDeepStorage(job);
    }

    private Task toTaskUsingDeepStorage(Job job) throws IOException {
        Optional streamTaskPayload = this.taskLogs.streamTaskPayload(getTaskId(job).getOriginalTaskId());
        if (!streamTaskPayload.isPresent()) {
            throw InternalServerError.exception("Could not load task payload from deep storage for job [%s]. Check the overlord logs for errors uploading task payloads to deep storage.", job.getMetadata().getName(), new Object[0]);
        }
        return (Task) this.mapper.readValue(IOUtils.toString((InputStream) streamTaskPayload.get(), Charset.defaultCharset()), Task.class);
    }

    @Override // org.apache.druid.k8s.overlord.taskadapter.TaskAdapter
    public K8sTaskId getTaskId(Job job) {
        Map annotations = job.getSpec().getTemplate().getMetadata().getAnnotations();
        if (annotations == null) {
            throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", new Object[]{job.getMetadata().getName()});
        }
        String str = (String) annotations.get(DruidK8sConstants.TASK_ID);
        if (str == null) {
            throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", new Object[]{job.getMetadata().getName()});
        }
        return new K8sTaskId(str);
    }

    private HashMap<String, PodTemplate> initializePodTemplates(Properties properties) {
        HashMap<String, PodTemplate> hashMap = new HashMap<>();
        java.util.Optional<PodTemplate> loadPodTemplate = loadPodTemplate("base", properties);
        if (!loadPodTemplate.isPresent()) {
            throw new IAE("Pod template task adapter requires a base pod template to be specified", new Object[0]);
        }
        hashMap.put("base", loadPodTemplate.get());
        DeserializationConfig deserializationConfig = this.mapper.getDeserializationConfig();
        Iterator it = this.mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(deserializationConfig, AnnotatedClassResolver.resolveWithoutSuperTypes(deserializationConfig, Task.class)).iterator();
        while (it.hasNext()) {
            String name = ((NamedType) it.next()).getName();
            loadPodTemplate(name, properties).ifPresent(podTemplate -> {
                hashMap.put(name, podTemplate);
            });
        }
        return hashMap;
    }

    private java.util.Optional<PodTemplate> loadPodTemplate(String str, Properties properties) {
        String format = StringUtils.format(TASK_PROPERTY, new Object[]{str});
        String property = properties.getProperty(format);
        if (property == null) {
            log.debug("Pod template file not specified for [%s]", new Object[]{str});
            return java.util.Optional.empty();
        }
        try {
            return java.util.Optional.of((PodTemplate) Serialization.unmarshal(Files.newInputStream(new File(property).toPath(), new OpenOption[0]), PodTemplate.class));
        } catch (Exception e) {
            throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", new Object[]{format, property});
        }
    }

    private Collection<EnvVar> getEnv(Task task) throws IOException {
        ArrayList newArrayList = Lists.newArrayList(new EnvVar[]{new EnvVarBuilder().withName(DruidK8sConstants.TASK_DIR_ENV).withValue(this.taskConfig.getBaseDir()).build(), new EnvVarBuilder().withName(DruidK8sConstants.TASK_ID_ENV).withValue(task.getId()).build(), new EnvVarBuilder().withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV).withValue(Boolean.toString(task.supportsQueries())).build()});
        if (!shouldUseDeepStorageForTaskPayload(task)) {
            newArrayList.add(new EnvVarBuilder().withName(DruidK8sConstants.TASK_JSON_ENV).withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector((String) null, StringUtils.format("metadata.annotations['%s']", new Object[]{DruidK8sConstants.TASK}))).build()).build());
        }
        return newArrayList;
    }

    private Map<String, String> getPodLabels(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, Task task) {
        return getJobLabels(kubernetesTaskRunnerConfig, task);
    }

    private Map<String, String> getPodTemplateAnnotations(Task task) throws IOException {
        ImmutableMap.Builder put = ImmutableMap.builder().put(DruidK8sConstants.TLS_ENABLED, String.valueOf(this.node.isEnableTlsPort())).put(DruidK8sConstants.TASK_ID, task.getId()).put(DruidK8sConstants.TASK_TYPE, task.getType()).put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId()).put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource());
        if (!shouldUseDeepStorageForTaskPayload(task)) {
            put.put(DruidK8sConstants.TASK, Base64Compression.compressBase64(this.mapper.writeValueAsString(task)));
        }
        return put.build();
    }

    private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, Task task) {
        return ImmutableMap.builder().putAll(kubernetesTaskRunnerConfig.getLabels()).put(DruidK8sConstants.LABEL_KEY, "true").put(getDruidLabel(DruidK8sConstants.TASK_ID), KubernetesOverlordUtils.convertTaskIdToK8sLabel(task.getId())).put(getDruidLabel(DruidK8sConstants.TASK_TYPE), KubernetesOverlordUtils.convertStringToK8sLabel(task.getType())).put(getDruidLabel(DruidK8sConstants.TASK_GROUP_ID), KubernetesOverlordUtils.convertTaskIdToK8sLabel(task.getGroupId())).put(getDruidLabel(DruidK8sConstants.TASK_DATASOURCE), KubernetesOverlordUtils.convertStringToK8sLabel(task.getDataSource())).build();
    }

    private Map<String, String> getJobAnnotations(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, Task task) {
        return ImmutableMap.builder().putAll(kubernetesTaskRunnerConfig.getAnnotations()).put(DruidK8sConstants.TASK_ID, task.getId()).put(DruidK8sConstants.TASK_TYPE, task.getType()).put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId()).put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource()).build();
    }

    private String getDruidLabel(String str) {
        return DruidK8sConstants.DRUID_LABEL_PREFIX + str;
    }

    @Override // org.apache.druid.k8s.overlord.taskadapter.TaskAdapter
    public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException {
        return ((long) Base64Compression.compressBase64(this.mapper.writeValueAsString(task)).length()) > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
    }
}
