package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.GracePeriodConfigurable;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/NativeFlinkService.class */
public class NativeFlinkService extends AbstractFlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
    private final EventRecorder eventRecorder;

    public NativeFlinkService(KubernetesClient kubernetesClient, ArtifactManager artifactManager, ExecutorService executorService, FlinkOperatorConfiguration flinkOperatorConfiguration, EventRecorder eventRecorder) {
        super(kubernetesClient, artifactManager, executorService, flinkOperatorConfiguration);
        this.eventRecorder = eventRecorder;
    }

    @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkService
    protected void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) throws Exception {
        LOG.info("Deploying application cluster");
        new ApplicationClusterDeployer(new DefaultClusterClientServiceLoader()).run(configuration, new ApplicationConfiguration(jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0], jobSpec.getEntryClass()));
        LOG.info("Application cluster successfully deployed");
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void submitSessionCluster(Configuration configuration) throws Exception {
        submitClusterInternal(removeOperatorConfigs(configuration));
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        cancelJob(flinkDeployment, upgradeMode, configuration, !((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14));
    }

    @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkService
    protected PodList getJmPodList(String str, String str2) {
        return (PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.kubernetesClient.pods().inNamespace(str)).withLabels(KubernetesUtils.getJobManagerSelectors(str2))).list();
    }

    @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkService
    protected PodList getTmPodList(String str, String str2) {
        return new PodList();
    }

    protected void submitClusterInternal(Configuration configuration) throws Exception {
        LOG.info("Deploying session cluster");
        ClusterClientFactory clusterClientFactory = new DefaultClusterClientServiceLoader().getClusterClientFactory(configuration);
        ClusterDescriptor createClusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
        try {
            createClusterDescriptor.deploySessionCluster(clusterClientFactory.getClusterSpecification(configuration));
            if (createClusterDescriptor != null) {
                createClusterDescriptor.close();
            }
            LOG.info("Session cluster successfully deployed");
        } catch (Throwable th) {
            if (createClusterDescriptor != null) {
                try {
                    createClusterDescriptor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkService
    protected void deleteClusterInternal(ObjectMeta objectMeta, Configuration configuration, boolean z, DeletionPropagation deletionPropagation) {
        String namespace = objectMeta.getNamespace();
        String name = objectMeta.getName();
        LOG.info("Deleting JobManager deployment {}.", z ? "and HA metadata" : "while preserving HA metadata");
        ((GracePeriodConfigurable) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(KubernetesUtils.getDeploymentName(name))).withPropagationPolicy(deletionPropagation)).delete();
        if (z) {
            deleteHAData(namespace, name, configuration);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.kubernetes.operator.api.AbstractFlinkResource] */
    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public FlinkService.ScalingResult scale(FlinkResourceContext<?> flinkResourceContext, Configuration configuration) throws Exception {
        ?? resource = flinkResourceContext.getResource();
        Configuration observeConfig = flinkResourceContext.getObserveConfig();
        if (!supportsInPlaceScaling(resource, observeConfig)) {
            return FlinkService.ScalingResult.CANNOT_SCALE;
        }
        Map map = (Map) configuration.get(PipelineOptions.PARALLELISM_OVERRIDES);
        Map map2 = (Map) observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        if (map.isEmpty() && map2.isEmpty()) {
            LOG.info("No overrides defined before or after. Cannot scale in-place.");
            return FlinkService.ScalingResult.CANNOT_SCALE;
        }
        try {
            RestClusterClient<String> clusterClient = getClusterClient(observeConfig);
            try {
                HashMap hashMap = new HashMap(getVertexResources(clusterClient, resource));
                FlinkService.ScalingResult scalingResult = FlinkService.ScalingResult.ALREADY_SCALED;
                for (Map.Entry entry : hashMap.entrySet()) {
                    String jobVertexID = ((JobVertexID) entry.getKey()).toString();
                    JobVertexResourceRequirements.Parallelism parallelism = ((JobVertexResourceRequirements) entry.getValue()).getParallelism();
                    String str = (String) map.get(jobVertexID);
                    if (str != null) {
                        int parseInt = Integer.parseInt(str);
                        JobVertexResourceRequirements.Parallelism parallelism2 = new JobVertexResourceRequirements.Parallelism(parseInt, parseInt);
                        if (!parallelism.equals(parallelism2)) {
                            entry.setValue(new JobVertexResourceRequirements(parallelism2));
                            scalingResult = FlinkService.ScalingResult.SCALING_TRIGGERED;
                        }
                    } else if (map2.containsKey(jobVertexID)) {
                        LOG.info("Parallelism override for {} has been removed, falling back to regular upgrade.", jobVertexID);
                        FlinkService.ScalingResult scalingResult2 = FlinkService.ScalingResult.CANNOT_SCALE;
                        if (clusterClient != null) {
                            clusterClient.close();
                        }
                        return scalingResult2;
                    }
                }
                if (scalingResult == FlinkService.ScalingResult.ALREADY_SCALED) {
                    LOG.info("Vertex resources requirements already match target, nothing to do...");
                } else {
                    updateVertexResources(clusterClient, resource, hashMap);
                    this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) resource, EventRecorder.Type.Normal, EventRecorder.Reason.Scaling, EventRecorder.Component.Job, "In-place scaling triggered");
                }
                FlinkService.ScalingResult scalingResult3 = scalingResult;
                if (clusterClient != null) {
                    clusterClient.close();
                }
                return scalingResult3;
            } finally {
            }
        } catch (Throwable th) {
            LOG.error("Error while rescaling, falling back to regular upgrade", th);
            return FlinkService.ScalingResult.CANNOT_SCALE;
        }
    }

    private static boolean supportsInPlaceScaling(AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration) {
        if (((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob() == null || !((Boolean) configuration.get(KubernetesOperatorConfigOptions.JOB_UPGRADE_INPLACE_SCALING_ENABLED)).booleanValue()) {
            return false;
        }
        if (!((FlinkVersion) configuration.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_17)) {
            LOG.debug("In-place rescaling is only available starting from Flink 1.18");
            return false;
        }
        if (!((JobManagerOptions.SchedulerType) configuration.get(JobManagerOptions.SCHEDULER)).equals(JobManagerOptions.SchedulerType.Adaptive)) {
            LOG.debug("In-place rescaling is only available with the adaptive scheduler");
            return false;
        }
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        if (!ReconciliationUtils.isJobInTerminalState(commonStatus) && !JobStatus.RECONCILING.name().equals(commonStatus.getJobStatus().getState())) {
            return true;
        }
        LOG.info("Job in terminal or reconciling state cannot be scaled in-place");
        return false;
    }

    @VisibleForTesting
    protected void updateVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource, Map<JobVertexID, JobVertexResourceRequirements> map) throws Exception {
        JobMessageParameters jobMessageParameters = new JobMessageParameters();
        jobMessageParameters.jobPathParameter.resolve(JobID.fromHexString(((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getJobId()));
        restClusterClient.sendRequest(new JobResourcesRequirementsUpdateHeaders(), jobMessageParameters, new JobResourceRequirementsBody(new JobResourceRequirements(map))).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
    }

    @VisibleForTesting
    protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource) throws Exception {
        JobMessageParameters jobMessageParameters = new JobMessageParameters();
        jobMessageParameters.jobPathParameter.resolve(JobID.fromHexString(((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getJobId()));
        return ((JobResourceRequirementsBody) restClusterClient.sendRequest(new JobResourceRequirementsHeaders(), jobMessageParameters, EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS)).asJobResourceRequirements().get().getJobVertexParallelisms();
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [org.apache.flink.kubernetes.operator.api.AbstractFlinkResource] */
    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public boolean scalingCompleted(FlinkResourceContext<?> flinkResourceContext) {
        Configuration observeConfig = flinkResourceContext.getObserveConfig();
        CommonStatus commonStatus = (CommonStatus) flinkResourceContext.getResource().getStatus();
        try {
            RestClusterClient<String> clusterClient = flinkResourceContext.getFlinkService().getClusterClient(observeConfig);
            try {
                JobDetailsInfo jobDetailsInfo = (JobDetailsInfo) clusterClient.getJobDetails(JobID.fromHexString(commonStatus.getJobStatus().getJobId())).get();
                if (jobDetailsInfo.getJobVertexInfos().isEmpty()) {
                    if (clusterClient != null) {
                        clusterClient.close();
                    }
                    return false;
                }
                Map map = (Map) jobDetailsInfo.getJobVertexInfos().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getJobVertexID();
                }, (v0) -> {
                    return v0.getParallelism();
                }));
                Map map2 = (Map) observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
                for (Map.Entry entry : map.entrySet()) {
                    String str = (String) map2.get(((JobVertexID) entry.getKey()).toHexString());
                    if (str != null) {
                        Integer valueOf = Integer.valueOf(str);
                        if (!valueOf.equals(entry.getValue())) {
                            LOG.info("Scaling still in progress for vertex {}, {} -> {}", new Object[]{entry.getKey(), entry.getValue(), valueOf});
                            if (clusterClient != null) {
                                clusterClient.close();
                            }
                            return false;
                        }
                    }
                }
                LOG.info("All vertexes have successfully scaled");
                commonStatus.getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
                if (clusterClient != null) {
                    clusterClient.close();
                }
                return true;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
