package org.apache.flink.statefun.flink.launcher;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
import org.apache.flink.statefun.flink.core.spi.ModuleSpecs;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/statefun/flink/launcher/StatefulFunctionsJobGraphRetriever.class */
final class StatefulFunctionsJobGraphRetriever implements JobGraphRetriever {
    private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsJobGraphRetriever.class);
    private final JobID jobId;
    private final SavepointRestoreSettings savepointRestoreSettings;
    private final int parallelism;
    private final String[] programArguments;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatefulFunctionsJobGraphRetriever(JobID jobID, SavepointRestoreSettings savepointRestoreSettings, int i, String[] strArr) {
        this.jobId = (JobID) Objects.requireNonNull(jobID, "jobId");
        this.savepointRestoreSettings = (SavepointRestoreSettings) Objects.requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
        this.parallelism = i;
        this.programArguments = (String[]) Objects.requireNonNull(strArr, "programArguments");
    }

    private static List<URL> obtainModuleAdditionalClassPath() {
        try {
            ModuleSpecs fromPath = ModuleSpecs.fromPath("/opt/statefun/modules");
            ArrayList arrayList = new ArrayList();
            Iterator it = fromPath.iterator();
            while (it.hasNext()) {
                Iterator it2 = ((ModuleSpecs.ModuleSpec) it.next()).artifactUris().iterator();
                while (it2.hasNext()) {
                    arrayList.add(((URI) it2.next()).toURL());
                }
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException("Unable to load modules from path /opt/statefun/modules", e);
        }
    }

    public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
        PackagedProgram createPackagedProgram = createPackagedProgram();
        int resolveParallelism = resolveParallelism(this.parallelism, configuration);
        LOG.info("Creating JobGraph for job {}, with parallelism {} and savepoint restore settings {}.", new Object[]{this.jobId, Integer.valueOf(resolveParallelism), this.savepointRestoreSettings});
        try {
            JobGraph createJobGraph = PackagedProgramUtils.createJobGraph(createPackagedProgram, configuration, resolveParallelism, this.jobId, false);
            createJobGraph.setSavepointRestoreSettings(this.savepointRestoreSettings);
            return createJobGraph;
        } catch (Exception e) {
            throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
        }
    }

    private PackagedProgram createPackagedProgram() {
        File file = new File("/opt/flink/lib/statefun-flink-core.jar");
        if (!file.exists()) {
            throw new IllegalStateException("Unable to locate the launcher jar");
        }
        try {
            return PackagedProgram.newBuilder().setJarFile(file).setUserClassPaths(obtainModuleAdditionalClassPath()).setEntryPointClassName(StatefulFunctionsJob.class.getName()).setArguments(this.programArguments).build();
        } catch (ProgramInvocationException e) {
            throw new RuntimeException("Unable to construct a packaged program", e);
        }
    }

    private static int resolveParallelism(int i, Configuration configuration) {
        return i == -1 ? configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM) : i;
    }
}
