package org.apache.beam.sdk.extensions.python;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.ProcessBuilder;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.UnmodifiableIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/extensions/python/PythonService.class */
public class PythonService {
    private static final Logger LOG = LoggerFactory.getLogger(PythonService.class);
    private final String module;
    private String beamRequirement;
    private final List<String> args;
    private final List<String> extraPackages;

    public PythonService(String str, List<String> list, List<String> list2) {
        this.module = str;
        this.args = list;
        this.extraPackages = list2;
        this.beamRequirement = getMatchingStablePythonSDKVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
    }

    public PythonService(String str, List<String> list) {
        this(str, list, ImmutableList.of());
    }

    public PythonService(String str, String... strArr) {
        this(str, (List<String>) Arrays.asList(strArr));
    }

    public PythonService withExtraPackages(List<String> list) {
        return new PythonService(this.module, this.args, ImmutableList.builder().addAll(this.extraPackages).addAll(list).build());
    }

    public PythonService withCustomBeamRequirement(String str) {
        this.beamRequirement = str;
        return this;
    }

    public AutoCloseable start() throws IOException, InterruptedException {
        File createTempFile = File.createTempFile("bootstrap_beam_venv", ".py");
        createTempFile.deleteOnExit();
        FileOutputStream fileOutputStream = new FileOutputStream(createTempFile.getAbsolutePath());
        Throwable th = null;
        try {
            try {
                ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fileOutputStream);
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileOutputStream.close();
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(whichPython());
                arrayList.add(createTempFile.getAbsolutePath());
                arrayList.add("--beam_version=" + this.beamRequirement);
                if (!this.extraPackages.isEmpty()) {
                    arrayList.add("--extra_packages=" + String.join(";", this.extraPackages));
                }
                LOG.info("Running bootstrap command " + arrayList);
                Process start = new ProcessBuilder(arrayList).redirectError(ProcessBuilder.Redirect.INHERIT).start();
                start.getOutputStream().close();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(start.getInputStream(), Charsets.UTF_8));
                String readLine = bufferedReader.readLine();
                String str = readLine;
                while (readLine != null) {
                    LOG.info(readLine);
                    if (readLine.length() > 0) {
                        str = readLine;
                    }
                    readLine = bufferedReader.readLine();
                }
                bufferedReader.close();
                int waitFor = start.waitFor();
                if (waitFor != 0) {
                    throw new RuntimeException("Python bootstrap failed with error " + waitFor + ", " + str);
                }
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(str);
                arrayList2.add("-m");
                arrayList2.add(this.module);
                arrayList2.addAll(this.args);
                LOG.info("Starting python service with arguments " + arrayList2);
                Process start2 = new ProcessBuilder(arrayList2).redirectError(ProcessBuilder.Redirect.INHERIT).redirectOutput(ProcessBuilder.Redirect.INHERIT).start();
                Objects.requireNonNull(start2);
                return start2::destroy;
            } finally {
            }
        } catch (Throwable th3) {
            if (th != null) {
                try {
                    fileOutputStream.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                fileOutputStream.close();
            }
            throw th3;
        }
    }

    private String whichPython() {
        UnmodifiableIterator it = ImmutableList.of("python3", "python").iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            try {
                new ProcessBuilder(str, "--version").start().waitFor();
                return str;
            } catch (IOException | InterruptedException e) {
            }
        }
        throw new RuntimeException("Unable to find a suitable Python executable.");
    }

    @VisibleForTesting
    static String getMatchingStablePythonSDKVersion(String str) {
        return (str == null || str.endsWith(".dev")) ? "latest" : str;
    }

    public static int findAvailablePort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        try {
            return serverSocket.getLocalPort();
        } finally {
            serverSocket.close();
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
    }

    public static void waitForPort(String str, int i, int i2) throws TimeoutException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        long j = 10;
        while (true) {
            long j2 = j;
            if (System.currentTimeMillis() - currentTimeMillis >= i2) {
                throw new TimeoutException("Timeout waiting for Python service startup after " + (System.currentTimeMillis() - currentTimeMillis) + " milliseconds.");
            }
            try {
                new Socket(str, i).close();
                return;
            } catch (IOException e) {
                Thread.sleep(j2);
                j = (long) (j2 * 1.2d);
            }
        }
    }
}
