package org.apache.druid.k8s.overlord.common;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.Loggable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.ScalableResource;
import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

/* loaded from: input_file:org/apache/druid/k8s/overlord/common/KubernetesPeonClient.class */
public class KubernetesPeonClient {
    private static final EmittingLogger log = new EmittingLogger(KubernetesPeonClient.class);
    private final KubernetesClientApi clientApi;
    private final String namespace;
    private final boolean debugJobs;
    private final ServiceEmitter emitter;

    public KubernetesPeonClient(KubernetesClientApi kubernetesClientApi, String str, boolean z, ServiceEmitter serviceEmitter) {
        this.clientApi = kubernetesClientApi;
        this.namespace = str;
        this.debugJobs = z;
        this.emitter = serviceEmitter;
    }

    public Pod launchPeonJobAndWaitForStart(Job job, Task task, long j, TimeUnit timeUnit) throws IllegalStateException {
        long currentTimeMillis = System.currentTimeMillis();
        return (Pod) this.clientApi.executeRequest(kubernetesClient -> {
            ((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).resource(job)).create();
            String name = job.getMetadata().getName();
            log.info("Successfully submitted job: %s ... waiting for job to launch", new Object[]{name});
            Pod pod = (Pod) ((PodResource) ((NonNamespaceOperation) kubernetesClient.pods().inNamespace(this.namespace)).withName(getPeonPodWithRetries(name).getMetadata().getName())).waitUntilCondition(pod2 -> {
                if (pod2 == null) {
                    return true;
                }
                return (pod2.getStatus() == null || pod2.getStatus().getPodIP() == null) ? false : true;
            }, j, timeUnit);
            if (pod == null) {
                throw new IllegalStateException("K8s pod for the task [%s] appeared and disappeared. It can happen if the task was canceled");
            }
            emitK8sPodMetrics(task, "k8s/peon/startup/time", System.currentTimeMillis() - currentTimeMillis);
            return pod;
        });
    }

    public JobResponse waitForPeonJobCompletion(K8sTaskId k8sTaskId, long j, TimeUnit timeUnit) {
        return (JobResponse) this.clientApi.executeRequest(kubernetesClient -> {
            Job job = (Job) ((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sJobName())).waitUntilCondition(job2 -> {
                return job2 == null || !(job2.getStatus() == null || job2.getStatus().getActive() != null || (job2.getStatus().getFailed() == null && job2.getStatus().getSucceeded() == null));
            }, j, timeUnit);
            if (job == null) {
                log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", new Object[]{k8sTaskId});
                return new JobResponse(null, PeonPhase.FAILED);
            }
            if (job.getStatus().getSucceeded() != null) {
                return new JobResponse(job, PeonPhase.SUCCEEDED);
            }
            log.warn("Task %s failed with status %s", new Object[]{k8sTaskId, job.getStatus()});
            return new JobResponse(job, PeonPhase.FAILED);
        });
    }

    public boolean deletePeonJob(K8sTaskId k8sTaskId) {
        if (this.debugJobs) {
            log.info("Not cleaning up job %s due to flag: debugJobs=true", new Object[]{k8sTaskId});
            return true;
        }
        Boolean bool = (Boolean) this.clientApi.executeRequest(kubernetesClient -> {
            return Boolean.valueOf(!((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sJobName())).delete().isEmpty());
        });
        if (bool.booleanValue()) {
            log.info("Cleaned up k8s job: %s", new Object[]{k8sTaskId});
        } else {
            log.info("K8s job does not exist: %s", new Object[]{k8sTaskId});
        }
        return bool.booleanValue();
    }

    public Optional<LogWatch> getPeonLogWatcher(K8sTaskId k8sTaskId) {
        try {
            LogWatch watchLog = ((Loggable) ((ScalableResource) ((NonNamespaceOperation) this.clientApi.getClient().batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sJobName())).inContainer("main")).watchLog();
            return watchLog == null ? Optional.absent() : Optional.of(watchLog);
        } catch (Exception e) {
            log.error(e, "Error watching logs from task: %s", new Object[]{k8sTaskId});
            return Optional.absent();
        }
    }

    public Optional<InputStream> getPeonLogs(K8sTaskId k8sTaskId) {
        try {
            InputStream logInputStream = ((Loggable) ((ScalableResource) ((NonNamespaceOperation) this.clientApi.getClient().batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sJobName())).inContainer("main")).getLogInputStream();
            return logInputStream == null ? Optional.absent() : Optional.of(logInputStream);
        } catch (Exception e) {
            log.error(e, "Error streaming logs from task: %s", new Object[]{k8sTaskId});
            return Optional.absent();
        }
    }

    public List<Job> getPeonJobs() {
        return (List) this.clientApi.executeRequest(kubernetesClient -> {
            return ((JobList) ((FilterWatchListDeletable) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withLabel(DruidK8sConstants.LABEL_KEY)).list()).getItems();
        });
    }

    public int deleteCompletedPeonJobsOlderThan(long j, TimeUnit timeUnit) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return ((Integer) this.clientApi.executeRequest(kubernetesClient -> {
            getJobsToCleanup(getPeonJobs(), j, timeUnit).forEach(job -> {
                if (((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(job.getMetadata().getName())).delete().isEmpty()) {
                    log.error("Failed to delete job %s", new Object[]{job.getMetadata().getName()});
                } else {
                    atomicInteger.incrementAndGet();
                }
            });
            return Integer.valueOf(atomicInteger.get());
        })).intValue();
    }

    private List<Job> getJobsToCleanup(List<Job> list, long j, TimeUnit timeUnit) {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis() - timeUnit.toMillis(j);
        list.forEach(job -> {
            if (job.getStatus().getActive() == null && Timestamp.valueOf(job.getStatus().getCompletionTime()).before(new Timestamp(currentTimeMillis))) {
                arrayList.add(job);
            }
        });
        return arrayList;
    }

    public Optional<Pod> getPeonPod(String str) {
        return (Optional) this.clientApi.executeRequest(kubernetesClient -> {
            return getPeonPod(kubernetesClient, str);
        });
    }

    private Optional<Pod> getPeonPod(KubernetesClient kubernetesClient, String str) {
        List items = ((PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) kubernetesClient.pods().inNamespace(this.namespace)).withLabel("job-name", str)).list()).getItems();
        return items.isEmpty() ? Optional.absent() : Optional.of((Pod) items.get(0));
    }

    public Pod getPeonPodWithRetries(String str) {
        return (Pod) this.clientApi.executeRequest(kubernetesClient -> {
            return getPeonPodWithRetries(kubernetesClient, str, 5, 10);
        });
    }

    @VisibleForTesting
    Pod getPeonPodWithRetries(KubernetesClient kubernetesClient, String str, int i, int i2) {
        try {
            return (Pod) RetryUtils.retry(() -> {
                Optional<Pod> peonPod = getPeonPod(kubernetesClient, str);
                if (peonPod.isPresent()) {
                    return (Pod) peonPod.get();
                }
                throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + str + " not found");
            }, DruidK8sConstants.IS_TRANSIENT, i, i2);
        } catch (Exception e) {
            throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + str + " not found");
        }
    }

    private void emitK8sPodMetrics(Task task, String str, long j) {
        ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
        IndexTaskUtils.setTaskDimensions(builder, task);
        this.emitter.emit(builder.setMetric(str, Long.valueOf(j)));
    }
}
