package org.apache.inlong.manager.plugin.flink;

import java.io.File;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/plugin/flink/FlinkService.class */
public class FlinkService {
    private static final Logger log = LoggerFactory.getLogger(FlinkService.class);
    private static final Pattern IP_PORT_PATTERN = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
    private final FlinkConfig flinkConfig = new FlinkConfiguration().getFlinkConfig();
    private final Integer parallelism = this.flinkConfig.getParallelism();
    private final String savepointDirectory = this.flinkConfig.getSavepointDirectory();
    private final Configuration configuration = new Configuration();

    public FlinkService(String str) throws Exception {
        String str2;
        Integer valueOf;
        this.configuration.setInteger(JobManagerOptions.PORT, this.flinkConfig.getJobManagerPort().intValue());
        if (StringUtils.isEmpty(str)) {
            str2 = this.flinkConfig.getAddress();
            valueOf = this.flinkConfig.getPort();
        } else {
            Map<String, String> translateFromEndpoint = translateFromEndpoint(str);
            if (translateFromEndpoint.isEmpty()) {
                throw new BusinessException("get address:port failed from endpoint " + str);
            }
            str2 = translateFromEndpoint.get("address");
            valueOf = Integer.valueOf(translateFromEndpoint.get("port"));
        }
        this.configuration.setString(JobManagerOptions.ADDRESS, str2);
        this.configuration.setInteger(RestOptions.PORT, valueOf.intValue());
    }

    private Map<String, String> translateFromEndpoint(String str) throws Exception {
        HashMap hashMap = new HashMap(2);
        Matcher matcher = IP_PORT_PATTERN.matcher(str);
        if (!matcher.find()) {
            throw new Exception("endpoint [" + str + "] was not match address:port");
        }
        hashMap.put("address", matcher.group(1));
        hashMap.put("port", matcher.group(2));
        return hashMap;
    }

    public FlinkConfig getFlinkConfig() {
        return this.flinkConfig;
    }

    public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception {
        try {
            return new RestClusterClient<>(this.configuration, StandaloneClusterId.getInstance());
        } catch (Exception e) {
            log.error("get flink client failed: ", e);
            throw new Exception("get flink client failed: " + e.getMessage());
        }
    }

    public JobStatus getJobStatus(String str) throws Exception {
        try {
            return (JobStatus) getFlinkClient().getJobStatus(JobID.fromHexString(str)).get();
        } catch (Exception e) {
            log.error("get job status by jobId={} failed: ", str, e);
            throw new Exception("get job status by jobId=" + str + " failed: " + e.getMessage());
        }
    }

    public JobDetailsInfo getJobDetail(String str) throws Exception {
        try {
            return (JobDetailsInfo) getFlinkClient().getJobDetails(JobID.fromHexString(str)).get();
        } catch (Exception e) {
            log.error("get job detail by jobId={} failed: ", str, e);
            throw new Exception("get job detail by jobId=" + str + " failed: " + e.getMessage());
        }
    }

    public String submit(FlinkInfo flinkInfo) throws Exception {
        try {
            return submitJobBySavepoint(flinkInfo, SavepointRestoreSettings.none());
        } catch (Exception e) {
            log.error("submit job from info {} failed: ", flinkInfo, e);
            throw new Exception("submit job failed: " + e.getMessage());
        }
    }

    public String restore(FlinkInfo flinkInfo) throws Exception {
        try {
            if (StringUtils.isNotEmpty(flinkInfo.getSavepointPath())) {
                return submitJobBySavepoint(flinkInfo, SavepointRestoreSettings.forPath(this.savepointDirectory, false));
            }
            log.warn("skip to restore as the savepoint path was empty " + flinkInfo);
            return null;
        } catch (Exception e) {
            log.error("restore job from info {} failed: ", flinkInfo, e);
            throw new Exception("restore job failed: " + e.getMessage());
        }
    }

    private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
        File file = new File(flinkInfo.getLocalJarPath());
        String[] genProgramArgsV2 = genProgramArgsV2(flinkInfo, this.flinkConfig);
        List list = (List) flinkInfo.getConnectorJarPaths().stream().map(str -> {
            try {
                return new File(str).toURI().toURL();
            } catch (MalformedURLException e) {
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        JobGraph createJobGraph = PackagedProgramUtils.createJobGraph(PackagedProgram.newBuilder().setConfiguration(this.configuration).setEntryPointClassName(Constants.ENTRYPOINT_CLASS).setJarFile(file).setUserClassPaths(list).setArguments(genProgramArgsV2).setSavepointRestoreSettings(savepointRestoreSettings).build(), this.configuration, this.parallelism.intValue(), false);
        createJobGraph.addJars(list);
        return ((JobID) getFlinkClient().submitJob(createJobGraph).get()).toString();
    }

    public String stopJob(String str, StopWithSavepointRequest stopWithSavepointRequest) throws Exception {
        try {
            return (String) getFlinkClient().stopWithSavepoint(JobID.fromHexString(str), stopWithSavepointRequest.isDrain(), stopWithSavepointRequest.getTargetDirectory()).get();
        } catch (Exception e) {
            log.error("stop job {} and request {} failed: ", new Object[]{str, stopWithSavepointRequest, e});
            throw new Exception("stop job " + str + " failed: " + e.getMessage());
        }
    }

    public void cancelJob(String str) throws Exception {
        try {
            getFlinkClient().cancel(JobID.fromHexString(str));
        } catch (Exception e) {
            log.error("cancel job {} failed: ", str, e);
            throw new Exception("cancel job " + str + " failed: " + e.getMessage());
        }
    }

    @Deprecated
    private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("-cluster-id");
        arrayList.add(flinkInfo.getJobName());
        arrayList.add("-dataflow.info.file");
        arrayList.add(flinkInfo.getLocalConfPath());
        arrayList.add("-source.type");
        arrayList.add(flinkInfo.getSourceType());
        arrayList.add("-sink.type");
        arrayList.add(flinkInfo.getSinkType());
        arrayList.add("-metrics.audit.proxy.hosts");
        arrayList.add(flinkConfig.getAuditProxyHosts());
        if (flinkInfo.getInlongStreamInfoList() != null && !flinkInfo.getInlongStreamInfoList().isEmpty()) {
            InlongStreamInfo inlongStreamInfo = flinkInfo.getInlongStreamInfoList().get(0);
            arrayList.add("-job.orderly.output");
            arrayList.add(String.valueOf(inlongStreamInfo.getSyncSend()));
        }
        return (String[]) arrayList.toArray(new String[0]);
    }

    private String[] genProgramArgsV2(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("-cluster-id");
        arrayList.add(flinkInfo.getJobName());
        arrayList.add("-group.info.file");
        arrayList.add(flinkInfo.getLocalConfPath());
        arrayList.add("-checkpoint.interval");
        arrayList.add("60000");
        return (String[]) arrayList.toArray(new String[0]);
    }
}
