package org.apache.druid.k8s.overlord.common;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.Loggable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.ScalableResource;
import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.io.input.ReaderInputStream;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;

/* loaded from: input_file:org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.class */
public class DruidKubernetesPeonClient implements KubernetesPeonClient {
    private static final EmittingLogger log = new EmittingLogger(DruidKubernetesPeonClient.class);
    private final KubernetesClientApi clientApi;
    private final String namespace;
    private final boolean debugJobs;

    public DruidKubernetesPeonClient(KubernetesClientApi kubernetesClientApi, String str, boolean z) {
        this.clientApi = kubernetesClientApi;
        this.namespace = str;
        this.debugJobs = z;
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public Optional<Job> jobExists(K8sTaskId k8sTaskId) {
        return (Optional) this.clientApi.executeRequest(kubernetesClient -> {
            return Optional.fromNullable(((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sTaskId())).get());
        });
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public Pod launchJobAndWaitForStart(Job job, long j, TimeUnit timeUnit) {
        long currentTimeMillis = System.currentTimeMillis();
        return (Pod) this.clientApi.executeRequest(kubernetesClient -> {
            ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).create(job);
            K8sTaskId k8sTaskId = new K8sTaskId(job.getMetadata().getName());
            log.info("Successfully submitted job: %s ... waiting for job to launch", new Object[]{k8sTaskId});
            Pod pod = (Pod) ((PodResource) ((NonNamespaceOperation) kubernetesClient.pods().inNamespace(this.namespace)).withName(getMainJobPod(k8sTaskId).getMetadata().getName())).waitUntilCondition(pod2 -> {
                return (pod2 == null || pod2.getStatus() == null || pod2.getStatus().getPodIP() == null) ? false : true;
            }, j, timeUnit);
            log.info("Took task %s %d ms for pod to startup", new Object[]{k8sTaskId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            return pod;
        });
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public JobResponse waitForJobCompletion(K8sTaskId k8sTaskId, long j, TimeUnit timeUnit) {
        return (JobResponse) this.clientApi.executeRequest(kubernetesClient -> {
            Job job = (Job) ((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sTaskId())).waitUntilCondition(job2 -> {
                return (job2 == null || job2.getStatus() == null || job2.getStatus().getActive() != null) ? false : true;
            }, j, timeUnit);
            return job.getStatus().getSucceeded() != null ? new JobResponse(job, PeonPhase.SUCCEEDED) : new JobResponse(job, PeonPhase.FAILED);
        });
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public boolean cleanUpJob(K8sTaskId k8sTaskId) {
        if (this.debugJobs) {
            log.info("Not cleaning up task %s due to flag: debugJobs=true", new Object[]{k8sTaskId});
            return true;
        }
        Boolean bool = (Boolean) this.clientApi.executeRequest(kubernetesClient -> {
            return ((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sTaskId())).delete();
        });
        if (bool.booleanValue()) {
            log.info("Cleaned up k8s task: %s", new Object[]{k8sTaskId});
        } else {
            log.info("Failed to cleanup task: %s", new Object[]{k8sTaskId});
        }
        return bool.booleanValue();
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public String getJobLogs(K8sTaskId k8sTaskId) {
        try {
            return (String) this.clientApi.executeRequest(kubernetesClient -> {
                return ((Loggable) ((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sTaskId())).inContainer("main")).getLog(true);
            });
        } catch (Exception e) {
            return "No logs found: " + e;
        }
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public Optional<InputStream> getPeonLogs(K8sTaskId k8sTaskId) {
        try {
            return (Optional) this.clientApi.executeRequest(kubernetesClient -> {
                Reader logReader = ((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(k8sTaskId.getK8sTaskId())).getLogReader();
                return logReader == null ? Optional.absent() : Optional.of(new ReaderInputStream(logReader, StandardCharsets.UTF_8));
            });
        } catch (Exception e) {
            log.error("Error streaming logs from task: %s", new Object[]{k8sTaskId});
            return Optional.absent();
        }
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public List<Job> listAllPeonJobs() {
        return (List) this.clientApi.executeRequest(kubernetesClient -> {
            return ((JobList) ((FilterWatchListDeletable) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withLabel("druid.k8s.peons")).list()).getItems();
        });
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public List<Pod> listPeonPods(Set<PeonPhase> set) {
        return (List) listPeonPods().stream().filter(pod -> {
            return set.contains(PeonPhase.getPhaseFor(pod));
        }).collect(Collectors.toList());
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public List<Pod> listPeonPods() {
        return ((PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.clientApi.executeRequest(kubernetesClient -> {
            return (NonNamespaceOperation) kubernetesClient.pods().inNamespace(this.namespace);
        })).withLabel("druid.k8s.peons")).list()).getItems();
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public int cleanCompletedJobsOlderThan(long j, TimeUnit timeUnit) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return ((Integer) this.clientApi.executeRequest(kubernetesClient -> {
            getJobsToCleanup(listAllPeonJobs(), j, timeUnit).forEach(job -> {
                if (((ScalableResource) ((NonNamespaceOperation) kubernetesClient.batch().v1().jobs().inNamespace(this.namespace)).withName(job.getMetadata().getName())).delete().booleanValue()) {
                    atomicInteger.incrementAndGet();
                }
            });
            return Integer.valueOf(atomicInteger.get());
        })).intValue();
    }

    @Override // org.apache.druid.k8s.overlord.common.KubernetesPeonClient
    public Pod getMainJobPod(K8sTaskId k8sTaskId) {
        return (Pod) this.clientApi.executeRequest(kubernetesClient -> {
            return getMainJobPod(kubernetesClient, k8sTaskId);
        });
    }

    @VisibleForTesting
    List<Job> getJobsToCleanup(List<Job> list, long j, TimeUnit timeUnit) {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis() - timeUnit.toMillis(j);
        list.forEach(job -> {
            if (job.getStatus().getActive() == null && Timestamp.valueOf(job.getStatus().getCompletionTime()).before(new Timestamp(currentTimeMillis))) {
                arrayList.add(job);
            }
        });
        return arrayList;
    }

    Pod getMainJobPod(KubernetesClient kubernetesClient, K8sTaskId k8sTaskId) {
        String k8sTaskId2 = k8sTaskId.getK8sTaskId();
        try {
            return (Pod) RetryUtils.retry(() -> {
                PodList podList = (PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) kubernetesClient.pods().inNamespace(this.namespace)).withLabel("job-name", k8sTaskId2)).list();
                if (podList.getItems().size() > 0) {
                    return (Pod) podList.getItems().get(0);
                }
                throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId2 + " not found");
            }, DruidK8sConstants.IS_TRANSIENT, 5, 10);
        } catch (Exception e) {
            throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId2 + " not found");
        }
    }
}
