package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.EditReplacePatchDeletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import java.util.concurrent.Executors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.class */
public class StandaloneFlinkService extends AbstractFlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(StandaloneFlinkService.class);

    public StandaloneFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager flinkConfigManager) {
        super(kubernetesClient, flinkConfigManager);
    }

    @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkService
    protected void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) throws Exception {
        LOG.info("Deploying application cluster");
        submitClusterInternal(removeOperatorConfigs(configuration), Mode.APPLICATION);
        LOG.info("Application cluster successfully deployed");
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void submitSessionCluster(Configuration configuration) throws Exception {
        LOG.info("Deploying session cluster");
        submitClusterInternal(removeOperatorConfigs(configuration), Mode.SESSION);
        LOG.info("Session cluster successfully deployed");
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        cancelJob(flinkDeployment, upgradeMode, configuration, true);
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void deleteClusterDeployment(ObjectMeta objectMeta, FlinkDeploymentStatus flinkDeploymentStatus, boolean z) {
        deleteClusterInternal(objectMeta, z);
    }

    @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkService
    protected PodList getJmPodList(String str, String str2) {
        return (PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.kubernetesClient.pods().inNamespace(str)).withLabels(StandaloneKubernetesUtils.getJobManagerSelectors(str2))).list();
    }

    @VisibleForTesting
    protected FlinkStandaloneKubeClient createNamespacedKubeClient(Configuration configuration, String str) {
        return new Fabric8FlinkStandaloneKubeClient(configuration, Fabric8FlinkStandaloneKubeClient.createNamespacedKubeClient(str), Executors.newFixedThreadPool(((Integer) configuration.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE)).intValue(), new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service")));
    }

    protected void submitClusterInternal(Configuration configuration, Mode mode) throws ClusterDeploymentException {
        KubernetesStandaloneClusterDescriptor kubernetesStandaloneClusterDescriptor = new KubernetesStandaloneClusterDescriptor(configuration, createNamespacedKubeClient(configuration, (String) configuration.get(KubernetesConfigOptions.NAMESPACE)));
        try {
            switch (mode) {
                case APPLICATION:
                    kubernetesStandaloneClusterDescriptor.deployApplicationCluster(getClusterSpecification(configuration), ApplicationConfiguration.fromConfiguration(configuration));
                    break;
                case SESSION:
                    kubernetesStandaloneClusterDescriptor.deploySessionCluster(getClusterSpecification(configuration));
                    break;
                default:
                    throw new UnsupportedOperationException(String.format("Unsupported running mode: %s", mode));
            }
            kubernetesStandaloneClusterDescriptor.close();
        } catch (Throwable th) {
            try {
                kubernetesStandaloneClusterDescriptor.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private ClusterSpecification getClusterSpecification(Configuration configuration) {
        return new KubernetesClusterClientFactory().getClusterSpecification(configuration);
    }

    private void deleteClusterInternal(ObjectMeta objectMeta, boolean z) {
        String name = objectMeta.getName();
        String namespace = objectMeta.getNamespace();
        LOG.info("Deleting Flink Standalone cluster TM resources");
        ((EditReplacePatchDeletable) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).cascading(true)).delete();
        LOG.info("Deleting Flink Standalone cluster JM resources");
        ((EditReplacePatchDeletable) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(name))).cascading(true)).delete();
        if (z) {
            waitForClusterShutdown(namespace, name, this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
            ((FilterWatchListDeletable) ((NonNamespaceOperation) this.kubernetesClient.configMaps().inNamespace(namespace)).withLabels(KubernetesUtils.getConfigMapLabels(name, "high-availability"))).delete();
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public boolean scale(ObjectMeta objectMeta, JobSpec jobSpec, Configuration configuration) {
        if (configuration.get(JobManagerOptions.SCHEDULER_MODE) == null) {
            LOG.info("Reactive scaling is not enabled");
            return false;
        }
        String name = objectMeta.getName();
        String namespace = objectMeta.getNamespace();
        String taskManagerDeploymentName = StandaloneKubernetesUtils.getTaskManagerDeploymentName(name);
        RollableScalableResource rollableScalableResource = (RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(taskManagerDeploymentName);
        if (rollableScalableResource == null || rollableScalableResource.get() == null) {
            LOG.warn("TM Deployment ({}) not found", taskManagerDeploymentName);
            return false;
        }
        Integer replicas = ((Deployment) rollableScalableResource.get()).getSpec().getReplicas();
        Integer num = (Integer) configuration.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
        if (replicas == num) {
            LOG.info("Not scaling TM replicas: actual({}) == desired({})", replicas, num);
            return true;
        }
        LOG.info("Scaling TM replicas: actual({}) -> desired({})", replicas, num);
        rollableScalableResource.scale(num.intValue());
        return true;
    }
}
