package org.apache.flink.client.python;

import java.io.File;
import java.io.IOException;
import java.lang.ProcessBuilder;
import java.nio.file.CopyOption;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitOption;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/client/python/PythonEnvUtils.class */
public final class PythonEnvUtils {
    private static final Logger LOG = LoggerFactory.getLogger(PythonEnvUtils.class);
    private static final String FLINK_OPT_DIR = System.getenv("FLINK_OPT_DIR");
    private static final String FLINK_OPT_DIR_PYTHON = FLINK_OPT_DIR + File.separator + "python";

    /* loaded from: input_file:org/apache/flink/client/python/PythonEnvUtils$PythonEnvironment.class */
    public static class PythonEnvironment {
        public String workingDirectory;
        public String pythonPath;
        public String pythonExec = "python";
        Map<String, String> systemEnv = new HashMap();
    }

    /* loaded from: input_file:org/apache/flink/client/python/PythonEnvUtils$ShutDownPythonHook.class */
    private static class ShutDownPythonHook extends Thread {
        private Process p;
        private String pyFileDir;

        public ShutDownPythonHook(Process process, String str) {
            this.p = process;
            this.pyFileDir = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.p.destroyForcibly();
            if (this.pyFileDir != null) {
                FileUtils.deleteDirectoryQuietly(new File(this.pyFileDir));
            }
        }
    }

    public static PythonEnvironment preparePythonEnvironment(List<Path> list) throws IOException {
        PythonEnvironment pythonEnvironment = new PythonEnvironment();
        Path path = new Path(System.getProperty("java.io.tmpdir") + File.separator + "pyflink" + File.separator + UUID.randomUUID());
        FileSystem fileSystem = path.getFileSystem();
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);
        }
        fileSystem.mkdirs(path);
        pythonEnvironment.workingDirectory = path.toString();
        StringBuilder sb = new StringBuilder();
        sb.append(pythonEnvironment.workingDirectory);
        for (java.nio.file.Path path2 : getLibFiles(FLINK_OPT_DIR_PYTHON)) {
            java.nio.file.Path path3 = FileSystems.getDefault().getPath(pythonEnvironment.workingDirectory, path2.getFileName().toString());
            createSymbolicLinkForPyflinkLib(path2, path3);
            sb.append(File.pathSeparator);
            sb.append(path3.toString());
        }
        for (Path path4 : list) {
            Path path5 = new Path(path, path4.getName());
            FileUtils.copy(path4, path5, true);
            String str = (String) Files.walk(Paths.get(path5.toString(), new String[0]), new FileVisitOption[0]).filter(path6 -> {
                return Files.isRegularFile(path6, new LinkOption[0]);
            }).filter(path7 -> {
                return !path7.toString().endsWith(".py");
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(File.pathSeparator));
            sb.append(File.pathSeparator);
            sb.append(str);
        }
        String str2 = (String) Files.walk(Paths.get(path.toString(), new String[0]), new FileVisitOption[0]).filter(path8 -> {
            return path8.toString().endsWith(".py");
        }).map((v0) -> {
            return v0.getParent();
        }).distinct().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(File.pathSeparator));
        if (!StringUtils.isNullOrWhitespaceOnly(str2)) {
            sb.append(File.pathSeparator);
            sb.append(str2);
        }
        pythonEnvironment.pythonPath = sb.toString();
        return pythonEnvironment;
    }

    public static List<java.nio.file.Path> getLibFiles(String str) {
        final ArrayList arrayList = new ArrayList();
        try {
            Files.walkFileTree(FileSystems.getDefault().getPath(str, new String[0]), new SimpleFileVisitor<java.nio.file.Path>() { // from class: org.apache.flink.client.python.PythonEnvUtils.1
                @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                public FileVisitResult visitFile(java.nio.file.Path path, BasicFileAttributes basicFileAttributes) throws IOException {
                    if (!path.toString().endsWith(".txt")) {
                        arrayList.add(path);
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
        } catch (IOException e) {
            LOG.error("Gets pyflink dependent libs failed.", e);
        }
        return arrayList;
    }

    public static void createSymbolicLinkForPyflinkLib(java.nio.file.Path path, java.nio.file.Path path2) throws IOException {
        try {
            Files.createSymbolicLink(path2, path, new FileAttribute[0]);
        } catch (IOException e) {
            LOG.error("Create symbol link for pyflink lib failed.", e);
            LOG.info("Try to copy pyflink lib to working directory");
            Files.copy(path, path2, new CopyOption[0]);
        }
    }

    public static Process startPythonProcess(PythonEnvironment pythonEnvironment, List<String> list) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        Map<String, String> environment = processBuilder.environment();
        environment.put("PYTHONPATH", pythonEnvironment.pythonPath);
        Map<String, String> map = pythonEnvironment.systemEnv;
        environment.getClass();
        map.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        list.add(0, pythonEnvironment.pythonExec);
        processBuilder.command(list);
        processBuilder.directory(new File(pythonEnvironment.workingDirectory));
        processBuilder.redirectErrorStream(true);
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        Process start = processBuilder.start();
        if (!start.isAlive()) {
            throw new RuntimeException("Failed to start Python process. ");
        }
        Runtime.getRuntime().addShutdownHook(new ShutDownPythonHook(start, pythonEnvironment.workingDirectory));
        return start;
    }
}
