package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesClientApi;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;

/* loaded from: input_file:org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.class */
public abstract class K8sTaskAdapter implements TaskAdapter {
    private static final EmittingLogger log = new EmittingLogger(K8sTaskAdapter.class);
    protected final KubernetesClientApi client;
    protected final KubernetesTaskRunnerConfig taskRunnerConfig;
    protected final TaskConfig taskConfig;
    protected final StartupLoggingConfig startupLoggingConfig;
    protected final DruidNode node;
    protected final ObjectMapper mapper;

    public K8sTaskAdapter(KubernetesClientApi kubernetesClientApi, KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, TaskConfig taskConfig, StartupLoggingConfig startupLoggingConfig, DruidNode druidNode, ObjectMapper objectMapper) {
        this.client = kubernetesClientApi;
        this.taskRunnerConfig = kubernetesTaskRunnerConfig;
        this.taskConfig = taskConfig;
        this.startupLoggingConfig = startupLoggingConfig;
        this.node = druidNode;
        this.mapper = objectMapper;
    }

    @Override // org.apache.druid.k8s.overlord.taskadapter.TaskAdapter
    public Job fromTask(Task task) throws IOException {
        String str = System.getenv(DruidK8sConstants.DRUID_HOSTNAME_ENV);
        Pod pod = (Pod) this.client.executeRequest(kubernetesClient -> {
            return (Pod) ((PodResource) ((NonNamespaceOperation) kubernetesClient.pods().inNamespace(this.taskRunnerConfig.getNamespace())).withName(str)).get();
        });
        PeonCommandContext peonCommandContext = new PeonCommandContext(generateCommand(task), javaOpts(task), this.taskConfig.getBaseTaskDir(), this.node.isEnableTlsPort());
        PodSpec spec = pod.getSpec();
        massageSpec(spec, this.taskRunnerConfig.getPrimaryContainerName());
        return createJobFromPodSpec(spec, task, peonCommandContext);
    }

    @Override // org.apache.druid.k8s.overlord.taskadapter.TaskAdapter
    public Task toTask(Job job) throws IOException {
        PodSpec spec = job.getSpec().getTemplate().getSpec();
        massageSpec(spec, "main");
        Optional findFirst = ((Container) spec.getContainers().get(0)).getEnv().stream().filter(envVar -> {
            return DruidK8sConstants.TASK_JSON_ENV.equals(envVar.getName());
        }).findFirst();
        String str = (String) findFirst.map(envVar2 -> {
            return ((EnvVar) findFirst.get()).getValue();
        }).orElse(null);
        if (str == null) {
            throw new IOException("No TASK_JSON environment variable found in pod: " + job.getMetadata().getName());
        }
        return (Task) this.mapper.readValue(Base64Compression.decompressBase64(str), Task.class);
    }

    @VisibleForTesting
    abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext peonCommandContext) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public Job buildJob(K8sTaskId k8sTaskId, Map<String, String> map, Map<String, String> map2, PodTemplateSpec podTemplateSpec) {
        return ((JobBuilder) ((JobBuilder) new JobBuilder().withNewMetadata().withName(k8sTaskId.getK8sTaskId()).addToLabels(map).addToAnnotations(map2).endMetadata()).withNewSpec().withTemplate(podTemplateSpec).withActiveDeadlineSeconds(Long.valueOf(this.taskRunnerConfig.getTaskTimeout().toStandardDuration().getStandardSeconds())).withBackoffLimit(0).withTtlSecondsAfterFinished(Integer.valueOf((int) this.taskRunnerConfig.getTaskCleanupDelay().toStandardDuration().getStandardSeconds())).endSpec()).build();
    }

    @VisibleForTesting
    static Optional<Long> getJavaOptValueBytes(String str, List<String> list) {
        Long l = null;
        Optional<String> reduce = list.stream().filter(str2 -> {
            return str2.startsWith(str);
        }).reduce((str3, str4) -> {
            return str4;
        });
        if (reduce.isPresent()) {
            l = Long.valueOf(HumanReadableBytes.parse(StringUtils.removeStart(reduce.get(), str)));
        }
        return Optional.ofNullable(l);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static long getContainerMemory(PeonCommandContext peonCommandContext) {
        List<String> javaOpts = peonCommandContext.getJavaOpts();
        Optional<Long> javaOptValueBytes = getJavaOptValueBytes("-Xmx", javaOpts);
        long parse = HumanReadableBytes.parse("1g");
        if (javaOptValueBytes.isPresent()) {
            parse = javaOptValueBytes.get().longValue();
        }
        Optional<Long> javaOptValueBytes2 = getJavaOptValueBytes("-XX:MaxDirectMemorySize=", javaOpts);
        long j = parse;
        if (javaOptValueBytes2.isPresent()) {
            j = javaOptValueBytes2.get().longValue();
        }
        return (long) ((j + parse) * 1.2d);
    }

    protected void setupPorts(Container container) {
        container.getPorts().clear();
        ContainerPort containerPort = new ContainerPort();
        containerPort.setContainerPort(Integer.valueOf(DruidK8sConstants.PORT));
        containerPort.setName("druid-port");
        containerPort.setProtocol("TCP");
        ContainerPort containerPort2 = new ContainerPort();
        containerPort2.setContainerPort(Integer.valueOf(DruidK8sConstants.TLS_PORT));
        containerPort2.setName("druid-tls-port");
        containerPort2.setProtocol("TCP");
        container.setPorts(Lists.newArrayList(new ContainerPort[]{containerPort2, containerPort}));
    }

    @VisibleForTesting
    void addEnvironmentVariables(Container container, PeonCommandContext peonCommandContext, String str) throws JsonProcessingException {
        if (!this.taskRunnerConfig.getPeonMonitors().isEmpty()) {
            container.getEnv().removeIf(envVar -> {
                return "druid_monitoring_monitors".equals(envVar.getName());
            });
            container.getEnv().add(new EnvVarBuilder().withName("druid_monitoring_monitors").withValue(this.mapper.writeValueAsString(this.taskRunnerConfig.getPeonMonitors())).build());
        }
        container.getEnv().addAll(Lists.newArrayList(new EnvVar[]{new EnvVarBuilder().withName(DruidK8sConstants.TASK_DIR_ENV).withValue(peonCommandContext.getTaskDir().getAbsolutePath()).build(), new EnvVarBuilder().withName(DruidK8sConstants.TASK_JSON_ENV).withValue(str).build(), new EnvVarBuilder().withName(DruidK8sConstants.JAVA_OPTS).withValue(Joiner.on(" ").join(peonCommandContext.getJavaOpts())).build(), new EnvVarBuilder().withName(DruidK8sConstants.DRUID_HOST_ENV).withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector((String) null, "status.podIP")).build()).build(), new EnvVarBuilder().withName(DruidK8sConstants.DRUID_HOSTNAME_ENV).withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector((String) null, "metadata.name")).build()).build()}));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Container setupMainContainer(PodSpec podSpec, PeonCommandContext peonCommandContext, long j, String str) throws JsonProcessingException {
        ArrayList newArrayList = Lists.newArrayList(new String[]{"sh", "-c"});
        Container container = (Container) Iterables.getFirst(podSpec.getContainers(), (Object) null);
        if (container == null) {
            throw new IllegalArgumentException("Must have at least one container");
        }
        container.setReadinessProbe((Probe) null);
        container.setLivenessProbe((Probe) null);
        setupPorts(container);
        addEnvironmentVariables(container, peonCommandContext, str);
        container.setCommand(newArrayList);
        container.setArgs(Collections.singletonList(Joiner.on(" ").join(peonCommandContext.getComamnd())));
        container.setName("main");
        container.setResources(getResourceRequirements(container.getResources(), j));
        return container;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> addJobSpecificAnnotations(PeonCommandContext peonCommandContext, K8sTaskId k8sTaskId) {
        return ImmutableMap.builder().putAll(this.taskRunnerConfig.getAnnotations()).put(DruidK8sConstants.TASK_ID, k8sTaskId.getOriginalTaskId()).put(DruidK8sConstants.TLS_ENABLED, String.valueOf(peonCommandContext.isEnableTls())).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> addJobSpecificLabels() {
        return ImmutableMap.builder().putAll(this.taskRunnerConfig.getLabels()).put(DruidK8sConstants.LABEL_KEY, "true").build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PodTemplateSpec createTemplateFromSpec(K8sTaskId k8sTaskId, PodSpec podSpec, Map<String, String> map, Map<String, String> map2) {
        podSpec.setNodeName((String) null);
        podSpec.setRestartPolicy("Never");
        podSpec.setHostname(k8sTaskId.getK8sTaskId());
        podSpec.setTerminationGracePeriodSeconds(this.taskRunnerConfig.getGraceTerminationPeriodSeconds());
        PodTemplateSpec podTemplateSpec = new PodTemplateSpec();
        ObjectMeta objectMeta = new ObjectMeta();
        objectMeta.setAnnotations(map);
        objectMeta.setLabels(map2);
        podTemplateSpec.setMetadata(objectMeta);
        podTemplateSpec.setSpec(podSpec);
        return podTemplateSpec;
    }

    @VisibleForTesting
    static void massageSpec(PodSpec podSpec, String str) {
        if (StringUtils.isNotBlank(str)) {
            int i = 0;
            while (i < podSpec.getContainers().size() && !str.equals(((Container) podSpec.getContainers().get(i)).getName())) {
                i++;
            }
            if (i >= podSpec.getContainers().size()) {
                throw new IllegalArgumentException("Could not find container named: " + str + " in PodSpec");
            }
            Container container = (Container) podSpec.getContainers().get(i);
            podSpec.getContainers().remove(i);
            podSpec.getContainers().add(0, container);
        }
    }

    private List<String> javaOpts(Task task) {
        ArrayList arrayList = new ArrayList();
        Iterables.addAll(arrayList, this.taskRunnerConfig.getJavaOptsArray());
        Object contextValue = task.getContextValue("druid.indexer.runner.javaOpts");
        if (contextValue != null) {
            Iterables.addAll(arrayList, new QuotableWhiteSpaceSplitter((String) contextValue));
        }
        arrayList.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.port=%d", new Object[]{Integer.valueOf(DruidK8sConstants.PORT)}));
        arrayList.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.plaintextPort=%d", new Object[]{Integer.valueOf(DruidK8sConstants.PORT)}));
        Object[] objArr = new Object[1];
        objArr[0] = Integer.valueOf(this.node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1);
        arrayList.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.tlsPort=%d", objArr));
        Object[] objArr2 = new Object[1];
        objArr2[0] = Integer.valueOf(this.node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1);
        arrayList.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.tlsPort=%d", objArr2));
        arrayList.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", new Object[]{Boolean.valueOf(this.node.isEnableTlsPort())}));
        return arrayList;
    }

    private List<String> generateCommand(Task task) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("/peon.sh");
        arrayList.add(this.taskConfig.getBaseTaskDir().getAbsolutePath());
        arrayList.add("1");
        String nodeType = task.getNodeType();
        if (nodeType != null) {
            arrayList.add("--nodeType");
            arrayList.add(nodeType);
        }
        if (task.supportsQueries()) {
            arrayList.add("--loadBroadcastSegments");
            arrayList.add("true");
        }
        log.info("Peon Command for K8s job: %s", new Object[]{ForkingTaskRunner.getMaskedCommand(this.startupLoggingConfig.getMaskProperties(), arrayList)});
        return arrayList;
    }

    @VisibleForTesting
    static ResourceRequirements getResourceRequirements(ResourceRequirements resourceRequirements, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("cpu", new Quantity("1000", "m"));
        hashMap.put("memory", new Quantity(String.valueOf(j)));
        ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder();
        if (resourceRequirements != null) {
            if (resourceRequirements.getRequests() == null || resourceRequirements.getRequests().isEmpty()) {
                resourceRequirements.setRequests(hashMap);
            } else {
                resourceRequirements.getRequests().putAll(hashMap);
            }
            if (resourceRequirements.getLimits() == null || resourceRequirements.getLimits().isEmpty()) {
                resourceRequirements.setLimits(hashMap);
            } else {
                resourceRequirements.getLimits().putAll(hashMap);
            }
        } else {
            resourceRequirements = resourceRequirementsBuilder.withRequests(hashMap).withLimits(hashMap).build();
        }
        return resourceRequirements;
    }
}
