/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.standalone;

import java.io.File;
import java.io.IOException;
import java.util.Optional;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Endpoint;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesStandaloneClusterDescriptor
extends KubernetesClusterDescriptor {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesStandaloneClusterDescriptor.class);
    private static final String CLUSTER_DESCRIPTION = "Standalone Kubernetes cluster";
    private final Configuration flinkConfig;
    private final FlinkStandaloneKubeClient client;
    private final String clusterId;

    public KubernetesStandaloneClusterDescriptor(Configuration flinkConfig, FlinkStandaloneKubeClient client) {
        super(flinkConfig, (FlinkKubeClient)client);
        this.flinkConfig = (Configuration)Preconditions.checkNotNull((Object)flinkConfig);
        this.client = (FlinkStandaloneKubeClient)Preconditions.checkNotNull((Object)client);
        this.clusterId = (String)Preconditions.checkNotNull((Object)flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), (String)"ClusterId must be specified!");
    }

    public String getClusterDescription() {
        return CLUSTER_DESCRIPTION;
    }

    public ClusterClientProvider<String> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        this.flinkConfig.set(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, (Object)StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
        ClusterClientProvider<String> clusterClientProvider = this.deployClusterInternal(clusterSpecification);
        try (ClusterClient clusterClient = clusterClientProvider.getClusterClient();){
            LOG.info("Created flink session cluster {} successfully, JobManager Web Interface: {}", (Object)this.clusterId, (Object)clusterClient.getWebInterfaceURL());
        }
        return clusterClientProvider;
    }

    public ClusterClientProvider<String> deployApplicationCluster(ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
        this.flinkConfig.set(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, (Object)StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
        applicationConfiguration.applyToConfiguration(this.flinkConfig);
        ClusterClientProvider<String> clusterClientProvider = this.deployClusterInternal(clusterSpecification);
        try (ClusterClient clusterClient = clusterClientProvider.getClusterClient();){
            LOG.info("Created flink application cluster {} successfully, JobManager Web Interface: {}", (Object)this.clusterId, (Object)clusterClient.getWebInterfaceURL());
        }
        return clusterClientProvider;
    }

    private ClusterClientProvider<String> deployClusterInternal(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        KubernetesUtils.checkAndUpdatePortConfigOption((Configuration)this.flinkConfig, (ConfigOption)BlobServerOptions.PORT, (int)6124);
        KubernetesUtils.checkAndUpdatePortConfigOption((Configuration)this.flinkConfig, (ConfigOption)TaskManagerOptions.RPC_PORT, (int)6122);
        KubernetesUtils.checkAndUpdatePortConfigOption((Configuration)this.flinkConfig, (ConfigOption)RestOptions.BIND_PORT, (int)8081);
        if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)this.flinkConfig)) {
            this.flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, this.clusterId);
            KubernetesUtils.checkAndUpdatePortConfigOption((Configuration)this.flinkConfig, (ConfigOption)HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, (int)((Integer)this.flinkConfig.get(JobManagerOptions.PORT)));
        }
        try {
            KubernetesJobManagerSpecification jmSpec = this.getJobManagerSpec(clusterSpecification);
            Deployment tmDeployment = this.getTaskManagerDeployment(clusterSpecification);
            this.client.createJobManagerComponent(jmSpec);
            this.client.createTaskManagerDeployment(tmDeployment);
            return this.createClusterClientProvider(this.clusterId);
        }
        catch (Exception e) {
            try {
                LOG.warn("Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", (Object)this.clusterId);
                this.client.stopAndCleanupCluster(this.clusterId);
            }
            catch (Exception e1) {
                LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", (Object)this.clusterId, (Object)e1);
            }
            throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + this.clusterId + "\".", (Throwable)e);
        }
    }

    private KubernetesJobManagerSpecification getJobManagerSpec(ClusterSpecification clusterSpecification) throws IOException {
        StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters = new StandaloneKubernetesJobManagerParameters(this.flinkConfig, clusterSpecification);
        FlinkPod podTemplate = kubernetesJobManagerParameters.getPodTemplateFilePath().map(file -> KubernetesUtils.loadPodFromTemplateFile((FlinkKubeClient)this.client, (File)file, (String)"flink-main-container")).orElse(new FlinkPod.Builder().build());
        return StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(podTemplate, kubernetesJobManagerParameters);
    }

    private Deployment getTaskManagerDeployment(ClusterSpecification clusterSpecification) {
        StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters = new StandaloneKubernetesTaskManagerParameters(this.flinkConfig, clusterSpecification);
        FlinkPod podTemplate = kubernetesTaskManagerParameters.getPodTemplateFilePath().map(file -> KubernetesUtils.loadPodFromTemplateFile((FlinkKubeClient)this.client, (File)file, (String)"flink-main-container")).orElse(new FlinkPod.Builder().build());
        return StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(podTemplate, kubernetesTaskManagerParameters);
    }

    private ClusterClientProvider<String> createClusterClientProvider(String clusterId) {
        return () -> {
            Configuration configuration = new Configuration(this.flinkConfig);
            Optional restEndpoint = this.client.getRestEndpoint(clusterId);
            if (!restEndpoint.isPresent()) {
                throw new RuntimeException((Throwable)new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId));
            }
            configuration.setString(RestOptions.ADDRESS, ((Endpoint)restEndpoint.get()).getAddress());
            configuration.setInteger(RestOptions.PORT, ((Endpoint)restEndpoint.get()).getPort());
            try {
                return new RestClusterClient(configuration, (Object)clusterId, (effectiveConfiguration, fatalErrorHandler) -> new StandaloneClientHAServices(this.getWebMonitorAddress(effectiveConfiguration)));
            }
            catch (Exception e) {
                throw new RuntimeException((Throwable)new ClusterRetrieveException("Could not create the RestClusterClient.", (Throwable)e));
            }
        };
    }

    private String getWebMonitorAddress(Configuration configuration) throws Exception {
        AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
        KubernetesConfigOptions.ServiceExposedType serviceType = (KubernetesConfigOptions.ServiceExposedType)configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
        if (serviceType.isClusterIP()) {
            resolution = AddressResolution.NO_ADDRESS_RESOLUTION;
            LOG.warn("Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since '{}' has been set to {}.", (Object)KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), (Object)serviceType);
        }
        return HighAvailabilityServicesUtils.getWebMonitorAddress((Configuration)configuration, (AddressResolution)resolution);
    }
}

