/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.launcher;

import cn.tenmg.flink.jobs.FlinkJobsLauncher;
import cn.tenmg.flink.jobs.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.launcher.AbstractFlinkJobsLauncher;
import cn.tenmg.flink.jobs.launcher.context.FlinkJobsLauncherContext;
import java.io.File;
import java.io.StringReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestClusterClientFlinkJobsLauncher
extends AbstractFlinkJobsLauncher {
    private static final Logger log = LoggerFactory.getLogger(RestClusterClientFlinkJobsLauncher.class);
    private static final Queue<Configuration> configurations = new LinkedList<Configuration>();
    private static final int COUNT;

    @Override
    public AbstractFlinkJobsLauncher.FlinkJobsInfo launch(FlinkJobs flinkJobs) throws Exception {
        try (RestClusterClient<StandaloneClusterId> client = null;){
            HashMap options = flinkJobs.getOptions();
            String classpaths = FlinkJobsLauncherContext.getProperty("classpaths");
            String parallelism = FlinkJobsLauncherContext.getProperty("parallelism.default", "1");
            SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
            if (options != null && !options.isEmpty()) {
                if (options.containsKey("classpaths")) {
                    classpaths = (String)options.get("classpaths");
                }
                if (options.containsKey("parallelism")) {
                    parallelism = (String)options.get("parallelism");
                }
                if (options.containsKey("fromSavepoint")) {
                    String savepointPath = (String)options.get("fromSavepoint");
                    savepointRestoreSettings = options.containsKey("allowNonRestoredState") ? SavepointRestoreSettings.forPath((String)savepointPath, (boolean)"true".equals(options.get("allowNonRestoredState"))) : SavepointRestoreSettings.forPath((String)savepointPath);
                }
            }
            Configuration configuration = RestClusterClientFlinkJobsLauncher.getConfiguration();
            PackagedProgram.Builder builder = PackagedProgram.newBuilder().setConfiguration(configuration).setEntryPointClassName(RestClusterClientFlinkJobsLauncher.getEntryPointClassName(flinkJobs)).setJarFile(new File(RestClusterClientFlinkJobsLauncher.getJar(flinkJobs))).setUserClassPaths(RestClusterClientFlinkJobsLauncher.toURLs(classpaths)).setSavepointRestoreSettings(savepointRestoreSettings);
            String arguments = RestClusterClientFlinkJobsLauncher.getArguments(flinkJobs);
            if (!RestClusterClientFlinkJobsLauncher.isEmptyArguments(arguments).booleanValue()) {
                builder.setArguments(new String[]{arguments});
            }
            JobGraph jobGraph = PackagedProgramUtils.createJobGraph((PackagedProgram)builder.build(), (Configuration)configuration, (int)Integer.parseInt(parallelism), (boolean)Boolean.valueOf(FlinkJobsLauncherContext.getProperty("suppress.output", "false")));
            String customConf = flinkJobs.getConfiguration();
            client = this.getRestClusterClient(configuration, customConf);
            for (int i = 0; i < COUNT; ++i) {
                try {
                    AbstractFlinkJobsLauncher.FlinkJobsInfo flinkJobsInfo = RestClusterClientFlinkJobsLauncher.submitJob(client, jobGraph);
                    return flinkJobsInfo;
                }
                catch (Exception e) {
                    if (client != null) {
                        client.close();
                    }
                    if (i >= COUNT) {
                        throw e;
                    }
                    log.error("Try to submit job fail", (Throwable)e);
                    client = this.getRestClusterClient(RestClusterClientFlinkJobsLauncher.getConfiguration(), customConf);
                    continue;
                }
            }
            AbstractFlinkJobsLauncher.FlinkJobsInfo flinkJobsInfo = null;
            return flinkJobsInfo;
        }
    }

    @Override
    public String stop(String jobId) throws Exception {
        try (RestClusterClient<StandaloneClusterId> client = null;){
            client = this.getRestClusterClient(RestClusterClientFlinkJobsLauncher.getConfiguration(), null);
            String string = (String)client.stopWithSavepoint(JobID.fromHexString((String)jobId), false, FlinkJobsLauncherContext.getProperty("state.savepoints.dir")).get();
            return string;
        }
    }

    private static AbstractFlinkJobsLauncher.FlinkJobsInfo submitJob(RestClusterClient<StandaloneClusterId> client, JobGraph jobGraph) throws Exception {
        CompletableFuture submited = client.submitJob(jobGraph);
        JobID jobId = (JobID)submited.get();
        AbstractFlinkJobsLauncher.FlinkJobsInfo appInfo = new AbstractFlinkJobsLauncher.FlinkJobsInfo();
        appInfo.setJobId(jobId.toString());
        JobStatus jobStatus = (JobStatus)client.getJobStatus(jobId).get();
        if (JobStatus.INITIALIZING.equals((Object)jobStatus)) {
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.ACCEPTED);
        } else if (JobStatus.RUNNING.equals((Object)jobStatus)) {
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.RUNNING);
        } else if (JobStatus.FINISHED.equals((Object)jobStatus)) {
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.FINISHED);
        } else if (JobStatus.FAILED.equals((Object)jobStatus)) {
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.FAILED);
        } else if (JobStatus.CANCELED.equals((Object)jobStatus)) {
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.KILLED);
        } else {
            appInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.SUBMITTED);
        }
        return appInfo;
    }

    private static synchronized Configuration getConfiguration() {
        Configuration configuration = configurations.poll();
        configurations.add(configuration);
        return configuration;
    }

    private RestClusterClient<StandaloneClusterId> getRestClusterClient(Configuration configuration, String customConf) throws Exception {
        if (customConf != null) {
            configuration = configuration.clone();
            Properties properties = new Properties();
            properties.load(new StringReader(customConf));
            configuration.addAll(ConfigurationUtils.createConfiguration((Properties)properties));
        }
        return this.newRestClusterClient(configuration);
    }

    private RestClusterClient<StandaloneClusterId> newRestClusterClient(Configuration configuration) throws Exception {
        return new RestClusterClient(configuration, (Object)StandaloneClusterId.getInstance());
    }

    private static List<URL> toURLs(String classpaths) throws MalformedURLException {
        if (classpaths == null || "".equals(classpaths.trim())) {
            return Collections.emptyList();
        }
        String[] paths = classpaths.contains(";") ? classpaths.split(";") : classpaths.split(",");
        ArrayList<URL> urls = new ArrayList<URL>();
        for (int i = 0; i < paths.length; ++i) {
            urls.add(new URL(paths[i].trim()));
        }
        return urls;
    }

    static {
        Configuration configuration = ConfigurationUtils.createConfiguration((Properties)FlinkJobsLauncherContext.getConfigProperties());
        String rpcServers = FlinkJobsLauncherContext.getProperty("jobmanager.rpc.servers");
        if (StringUtils.isBlank((CharSequence)rpcServers)) {
            configurations.add(configuration);
        } else {
            String[] servers = rpcServers.split(",");
            for (int i = 0; i < servers.length; ++i) {
                Configuration config = configuration.clone();
                String[] server = servers[i].split(":", 2);
                config.set(JobManagerOptions.ADDRESS, (Object)server[0].trim());
                if (server.length > 1) {
                    config.set(JobManagerOptions.PORT, (Object)Integer.parseInt(server[1].trim()));
                } else if (!config.contains(JobManagerOptions.PORT)) {
                    config.set(JobManagerOptions.PORT, (Object)6123);
                }
                configurations.add(config);
            }
        }
        COUNT = configurations.size();
    }
}

