/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.deployment;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class K8SApplicationDeploymentExecutor
implements PipelineDeploymentExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class);

    @Override
    public PipelineExecution.ExecutionInfo deploy(CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) {
        LOG.info("Submitting application in 'Flink K8S Application Mode'.");
        flinkConfig.set(DeploymentOptions.TARGET, (Object)KubernetesDeploymentTarget.APPLICATION.getName());
        ArrayList<String> jars = new ArrayList<String>();
        if (flinkConfig.get(PipelineOptions.JARS) == null) {
            jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist.jar");
            flinkConfig.set(PipelineOptions.JARS, jars);
        }
        flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, (Object)"flink/flink-cdc:latest");
        flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, (Object)commandLine.getArgList());
        flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, (Object)"org.apache.flink.cdc.cli.CliFrontend");
        KubernetesClusterClientFactory kubernetesClusterClientFactory = new KubernetesClusterClientFactory();
        KubernetesClusterDescriptor descriptor = kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig);
        ClusterSpecification specification = kubernetesClusterClientFactory.getClusterSpecification(flinkConfig);
        ApplicationConfiguration applicationConfiguration = ApplicationConfiguration.fromConfiguration((Configuration)flinkConfig);
        ClusterClient client = null;
        try {
            ClusterClientProvider clusterClientProvider = descriptor.deployApplicationCluster(specification, applicationConfiguration);
            client = clusterClientProvider.getClusterClient();
            String clusterId = (String)client.getClusterId();
            LOG.info("Deployment Flink CDC From Cluster ID {}", (Object)clusterId);
            PipelineExecution.ExecutionInfo executionInfo = new PipelineExecution.ExecutionInfo(clusterId, "submit job successful");
            return executionInfo;
        }
        catch (Exception e) {
            if (client != null) {
                client.shutDownCluster();
            }
            throw new RuntimeException("Failed to deploy Flink CDC job", e);
        }
        finally {
            descriptor.close();
            if (client != null) {
                client.close();
            }
        }
    }
}

