package org.apache.inlong.manager.plugin.flink;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.ConnectorJarType;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/plugin/flink/FlinkOperation.class */
public class FlinkOperation {
    private static final Logger log = LoggerFactory.getLogger(FlinkOperation.class);
    private static final String CONFIG_FILE = "application.properties";
    private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
    private static final String JOB_TERMINATED_MSG = "the job not found by id %s, or task already terminated or savepoint path is null";
    private static final String INLONG_MANAGER = "inlong-manager";
    private static final String INLONG_SORT = "inlong-sort";
    private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
    private static final String CONNECTOR_JAR_PATTERN = "^sort-connector-(?i)(%s).*jar$";
    private static final String ALL_CONNECTOR_JAR_PATTERN = "^sort-connector-.*jar$";
    private static Properties properties;
    private final FlinkService flinkService;

    public FlinkOperation(FlinkService flinkService) {
        this.flinkService = flinkService;
    }

    private static String getConnectorDir(String str) throws IOException {
        if (properties == null) {
            properties = new Properties();
            BufferedInputStream bufferedInputStream = new BufferedInputStream(Files.newInputStream(Paths.get(Thread.currentThread().getContextClassLoader().getResource("").getPath() + CONFIG_FILE, new String[0]), new OpenOption[0]));
            Throwable th = null;
            try {
                try {
                    properties.load(bufferedInputStream);
                    if (bufferedInputStream != null) {
                        if (0 != 0) {
                            try {
                                bufferedInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            bufferedInputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (bufferedInputStream != null) {
                    if (th != null) {
                        try {
                            bufferedInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        bufferedInputStream.close();
                    }
                }
                throw th3;
            }
        }
        return properties.getProperty(CONNECTOR_DIR_KEY, Paths.get(str, INLONG_SORT, "connectors").toString());
    }

    private String getConnectorJarPattern(String str) {
        ConnectorJarType connectorJarType = ConnectorJarType.getInstance(str);
        return connectorJarType == null ? ALL_CONNECTOR_JAR_PATTERN : String.format(CONNECTOR_JAR_PATTERN, connectorJarType.getConnectorType());
    }

    public void restart(FlinkInfo flinkInfo) throws Exception {
        String jobId = flinkInfo.getJobId();
        if (!isNullOrTerminated(jobId)) {
            TaskRunService.submit(new IntegrationTaskRunner(this.flinkService, flinkInfo, TaskCommitType.RESTART.getCode())).get();
        } else {
            String format = String.format("restart job failed, as the job not found by id %s, or task already terminated or savepoint path is null", jobId);
            log.error(format);
            throw new Exception(format);
        }
    }

    public void start(FlinkInfo flinkInfo) throws Exception {
        String jobId = flinkInfo.getJobId();
        try {
            if (StringUtils.isEmpty(jobId)) {
                TaskRunService.submit(new IntegrationTaskRunner(this.flinkService, flinkInfo, TaskCommitType.START_NOW.getCode())).get();
            } else {
                if (isNullOrTerminated(jobId) || StringUtils.isEmpty(flinkInfo.getSavepointPath())) {
                    String format = String.format("restore job failed, as the job not found by id %s, or task already terminated or savepoint path is null", jobId);
                    log.error(format);
                    throw new Exception(format);
                }
                TaskRunService.submit(new IntegrationTaskRunner(this.flinkService, flinkInfo, TaskCommitType.RESUME.getCode())).get();
            }
        } catch (Exception e) {
            log.warn("submit flink job failed for {}", flinkInfo, e);
            throw new Exception("submit flink job failed: " + e.getMessage());
        }
    }

    private void checkNodeIds(String str) throws Exception {
        JsonNode jsonNode = JsonUtils.parseTree(str).get("streams").get(0).get("relations");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < jsonNode.size(); i++) {
            ArrayList arrayList2 = new ArrayList((Collection) JsonUtils.OBJECT_MAPPER.convertValue(jsonNode.get(i).get("inputs"), new TypeReference<List<String>>() { // from class: org.apache.inlong.manager.plugin.flink.FlinkOperation.1
            }));
            if (CollectionUtils.isEmpty(arrayList2)) {
                String format = String.format("input nodeId %s cannot be empty", arrayList2);
                log.error(format);
                throw new Exception(format);
            }
            ArrayList arrayList3 = new ArrayList((Collection) JsonUtils.OBJECT_MAPPER.convertValue(jsonNode.get(i).get("outputs"), new TypeReference<List<String>>() { // from class: org.apache.inlong.manager.plugin.flink.FlinkOperation.2
            }));
            if (CollectionUtils.isEmpty(arrayList3)) {
                String format2 = String.format("output nodeId %s cannot be empty", arrayList3);
                log.error(format2);
                throw new Exception(format2);
            }
            if (!Collections.disjoint(arrayList2, arrayList3)) {
                String format3 = String.format("input nodeId %s cannot be equal to output nodeId %s", arrayList2, arrayList3);
                log.error(format3);
                throw new Exception(format3);
            }
            arrayList.add(Pair.of(arrayList2, arrayList3));
        }
        if (arrayList.size() > 1) {
            ArrayList arrayList4 = new ArrayList((Collection) ((Pair) arrayList.get(0)).getLeft());
            arrayList4.addAll((Collection) ((Pair) arrayList.get(0)).getRight());
            for (int i2 = 1; i2 < jsonNode.size(); i2++) {
                if (!Collections.disjoint(arrayList4, (Collection) ((Pair) arrayList.get(i2)).getLeft())) {
                    String format4 = String.format("input nodeId %s already exists ", ((Pair) arrayList.get(i2)).getLeft());
                    log.error(format4);
                    throw new Exception(format4);
                }
                arrayList4.addAll((Collection) ((Pair) arrayList.get(i2)).getLeft());
            }
        }
    }

    public void genPath(FlinkInfo flinkInfo, String str) throws Exception {
        String path = getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
        log.info("gen path from {}", path);
        int indexOf = path.indexOf(INLONG_MANAGER);
        if (indexOf == -1) {
            throw new Exception("inlong-manager path not found in " + path);
        }
        String substring = path.substring(0, path.lastIndexOf(File.separator));
        String substring2 = substring.substring(0, indexOf);
        String str2 = substring2 + INLONG_SORT;
        if (!new File(str2).exists()) {
            String format = String.format("file path [%s] not found", str2);
            log.error(format);
            throw new Exception(format);
        }
        String findFile = FlinkUtils.findFile(str2, SORT_JAR_PATTERN);
        flinkInfo.setLocalJarPath(findFile);
        log.info("get sort jar path success, path: {}", findFile);
        ArrayList arrayList = new ArrayList();
        if (StringUtils.isNotEmpty(str)) {
            checkNodeIds(str);
            arrayList.addAll((List) ((List) JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.parseTree(str).get("streams").get(0).get("nodes"), new TypeReference<List<Map<String, Object>>>() { // from class: org.apache.inlong.manager.plugin.flink.FlinkOperation.3
            })).stream().map(map -> {
                return map.get(Constants.TYPE).toString();
            }).collect(Collectors.toList()));
        }
        String connectorDir = getConnectorDir(substring2);
        Set set = (Set) arrayList.stream().filter(str3 -> {
            return str3.endsWith("Load") || str3.endsWith("Extract");
        }).map(str4 -> {
            return FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(str4), -1);
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
        if (CollectionUtils.isEmpty(set)) {
            String format2 = String.format("no sort connectors found in %s", connectorDir);
            log.error(format2);
            throw new RuntimeException(format2);
        }
        flinkInfo.setConnectorJarPaths(new ArrayList(set));
        log.info("get sort connector paths success, paths: {}", set);
        if (FlinkUtils.writeConfigToFile(substring, flinkInfo.getJobName(), str)) {
            flinkInfo.setLocalConfPath(substring + File.separator + flinkInfo.getJobName());
        } else {
            String format3 = String.format("write dataflow to %s failed", substring);
            log.error(format3 + ", dataflow: {}", str);
            throw new Exception(format3);
        }
    }

    public void stop(FlinkInfo flinkInfo) throws Exception {
        String jobId = flinkInfo.getJobId();
        if (!isNullOrTerminated(jobId)) {
            TaskRunService.submit(new IntegrationTaskRunner(this.flinkService, flinkInfo, TaskCommitType.STOP.getCode())).get();
        } else {
            String format = String.format("stop job failed, as the job not found by id %s, or task already terminated or savepoint path is null", jobId);
            log.error(format);
            throw new Exception(format);
        }
    }

    public void delete(FlinkInfo flinkInfo) throws Exception {
        String jobId = flinkInfo.getJobId();
        JobDetailsInfo jobDetail = this.flinkService.getJobDetail(jobId);
        if (jobDetail == null) {
            throw new Exception(String.format("delete job failed as the job [%s] not found", jobId));
        }
        JobStatus jobStatus = jobDetail.getJobStatus();
        if (jobStatus != null && jobStatus.isTerminalState()) {
            throw new Exception(String.format("unsupported delete job [%s] as it was [%s] terminated", jobId, jobStatus.isGloballyTerminalState() ? "globally" : "locally"));
        }
        TaskRunService.submit(new IntegrationTaskRunner(this.flinkService, flinkInfo, TaskCommitType.DELETE.getCode())).get();
    }

    public void pollJobStatus(FlinkInfo flinkInfo) throws Exception {
        JobDetailsInfo jobDetail;
        if (flinkInfo.isException()) {
            throw new BusinessException("startup failed: " + flinkInfo.getExceptionMsg());
        }
        String jobId = flinkInfo.getJobId();
        if (StringUtils.isBlank(jobId)) {
            log.error("job id cannot empty for {}", flinkInfo);
            throw new Exception("job id cannot empty");
        }
        while (true) {
            try {
                jobDetail = this.flinkService.getJobDetail(jobId);
            } catch (Exception e) {
                log.error("poll job status error for {}, exception: ", flinkInfo, e);
            }
            if (jobDetail == null) {
                log.error("job detail not found by {}", jobId);
                throw new Exception(String.format("job detail not found by %s", jobId));
            }
            JobStatus jobStatus = jobDetail.getJobStatus();
            if (jobStatus.isTerminalState()) {
                log.error("job was terminated for {}, exception: {}", jobId, flinkInfo.getExceptionMsg());
                throw new Exception("job was terminated for " + jobId);
            }
            if (jobStatus == JobStatus.RUNNING) {
                log.info("job status is Running for {}", jobId);
                return;
            } else {
                log.info("job was not Running for {}", jobId);
                TimeUnit.SECONDS.sleep(5L);
            }
        }
    }

    private boolean isNullOrTerminated(String str) throws Exception {
        JobDetailsInfo jobDetail = this.flinkService.getJobDetail(str);
        if (jobDetail == null || jobDetail.getJobStatus() == null) {
            log.warn("job detail or job status was null for [{}]", str);
            return true;
        }
        boolean isTerminalState = jobDetail.getJobStatus().isTerminalState();
        log.warn("job terminated state was [{}] for [{}]", Boolean.valueOf(isTerminalState), jobDetail);
        return isTerminalState;
    }
}
