package org.apache.flink.mesos.runtime.clusterframework;

import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMTaskFitnessCalculator;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.mesos.util.MesosResourceAllocation;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.class */
public class LaunchableMesosWorker implements LaunchableTask {
    protected static final Logger LOG;
    static final Set<String> TM_PORT_KEYS;
    private final MesosArtifactResolver resolver;
    private final ContainerSpecification containerSpec;
    private final MesosTaskManagerParameters params;
    private final Protos.TaskID taskID;
    private final Request taskRequest = new Request();
    private final MesosConfiguration mesosConfiguration;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker$Request.class */
    public class Request implements TaskRequest {
        private final AtomicReference<TaskRequest.AssignedResources> assignedResources = new AtomicReference<>();

        Request() {
        }

        @Override // com.netflix.fenzo.TaskRequest
        public String getId() {
            return LaunchableMesosWorker.this.taskID.getValue();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public String taskGroupName() {
            return JsonProperty.USE_DEFAULT_NAME;
        }

        @Override // com.netflix.fenzo.TaskRequest
        public double getCPUs() {
            return LaunchableMesosWorker.this.params.cpus();
        }

        double getGPUs() {
            return LaunchableMesosWorker.this.params.gpus();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public double getMemory() {
            return LaunchableMesosWorker.this.params.containeredParameters().getTaskExecutorProcessSpec().getTotalProcessMemorySize().getMebiBytes();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public double getNetworkMbps() {
            return LaunchableMesosWorker.this.params.network();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public double getDisk() {
            return LaunchableMesosWorker.this.params.disk();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public int getPorts() {
            return LaunchableMesosWorker.extractPortKeys(LaunchableMesosWorker.this.containerSpec.getFlinkConfiguration()).size();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public Map<String, Double> getScalarRequests() {
            return Collections.singletonMap("gpus", Double.valueOf(LaunchableMesosWorker.this.params.gpus()));
        }

        @Override // com.netflix.fenzo.TaskRequest
        public Map<String, TaskRequest.NamedResourceSetRequest> getCustomNamedResources() {
            return Collections.emptyMap();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public List<? extends ConstraintEvaluator> getHardConstraints() {
            return LaunchableMesosWorker.this.params.constraints();
        }

        @Override // com.netflix.fenzo.TaskRequest
        public List<? extends VMTaskFitnessCalculator> getSoftConstraints() {
            return null;
        }

        @Override // com.netflix.fenzo.TaskRequest
        public void setAssignedResources(TaskRequest.AssignedResources assignedResources) {
            this.assignedResources.set(assignedResources);
        }

        @Override // com.netflix.fenzo.TaskRequest
        public TaskRequest.AssignedResources getAssignedResources() {
            return this.assignedResources.get();
        }

        public String toString() {
            return "Request{cpus=" + getCPUs() + ", memory=" + getMemory() + ", gpus=" + getGPUs() + ", disk=" + getDisk() + ", network=" + getNetworkMbps() + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LaunchableMesosWorker(MesosArtifactResolver mesosArtifactResolver, MesosTaskManagerParameters mesosTaskManagerParameters, ContainerSpecification containerSpecification, Protos.TaskID taskID, MesosConfiguration mesosConfiguration) {
        this.resolver = (MesosArtifactResolver) Preconditions.checkNotNull(mesosArtifactResolver);
        this.containerSpec = (ContainerSpecification) Preconditions.checkNotNull(containerSpecification);
        this.params = (MesosTaskManagerParameters) Preconditions.checkNotNull(mesosTaskManagerParameters);
        this.taskID = (Protos.TaskID) Preconditions.checkNotNull(taskID);
        this.mesosConfiguration = (MesosConfiguration) Preconditions.checkNotNull(mesosConfiguration);
    }

    public Protos.TaskID taskID() {
        return this.taskID;
    }

    @Override // org.apache.flink.mesos.scheduler.LaunchableTask
    public TaskRequest taskRequest() {
        return this.taskRequest;
    }

    @Override // org.apache.flink.mesos.scheduler.LaunchableTask
    public Protos.TaskInfo launch(Protos.SlaveID slaveID, MesosResourceAllocation mesosResourceAllocation) {
        ContaineredTaskManagerParameters containeredParameters = this.params.containeredParameters();
        Configuration configuration = new Configuration();
        configuration.addAll(this.containerSpec.getFlinkConfiguration());
        Protos.TaskInfo.Builder name = Protos.TaskInfo.newBuilder().setSlaveId(slaveID).setTaskId(this.taskID).setName(this.taskID.getValue());
        Set<String> roles = this.mesosConfiguration.roles();
        name.addAllResources(mesosResourceAllocation.takeScalar("cpus", this.taskRequest.getCPUs(), roles));
        name.addAllResources(mesosResourceAllocation.takeScalar("gpus", this.taskRequest.getGPUs(), roles));
        name.addAllResources(mesosResourceAllocation.takeScalar("mem", this.taskRequest.getMemory(), roles));
        if (this.taskRequest.getDisk() > 0.0d) {
            name.addAllResources(mesosResourceAllocation.takeScalar("disk", this.taskRequest.getDisk(), roles));
        }
        if (this.taskRequest.getNetworkMbps() > 0.0d) {
            name.addAllResources(mesosResourceAllocation.takeScalar("network", this.taskRequest.getNetworkMbps(), roles));
        }
        Protos.CommandInfo.Builder commandBuilder = name.getCommandBuilder();
        Protos.Environment.Builder environmentBuilder = commandBuilder.getEnvironmentBuilder();
        StringBuilder sb = new StringBuilder();
        Option<String> taskManagerHostname = this.params.getTaskManagerHostname();
        if (taskManagerHostname.isDefined()) {
            configuration.setString(TaskManagerOptions.HOST, MesosTaskManagerParameters.TASK_ID_PATTERN.matcher((CharSequence) taskManagerHostname.get()).replaceAll(Matcher.quoteReplacement(this.taskID.getValue())));
        }
        Set<String> extractPortKeys = extractPortKeys(this.containerSpec.getFlinkConfiguration());
        List<Protos.Resource> takeRanges = mesosResourceAllocation.takeRanges("ports", extractPortKeys.size(), roles);
        name.addAllResources(takeRanges);
        Iterator<String> it = extractPortKeys.iterator();
        Utils.rangeValues(takeRanges).forEach(j -> {
            configuration.setLong((String) it.next(), j);
        });
        if (it.hasNext()) {
            throw new IllegalArgumentException("insufficient # of ports assigned");
        }
        Iterator it2 = this.containerSpec.getArtifacts().iterator();
        while (it2.hasNext()) {
            commandBuilder.addUris(Utils.uri(this.resolver, (ContainerSpecification.Artifact) it2.next()));
        }
        Iterator<String> it3 = this.params.uris().iterator();
        while (it3.hasNext()) {
            commandBuilder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(it3.next()));
        }
        for (Map.Entry entry : this.params.containeredParameters().taskManagerEnv().entrySet()) {
            environmentBuilder.addVariables(Utils.variable((String) entry.getKey(), (String) entry.getValue()));
        }
        for (Map.Entry entry2 : this.containerSpec.getEnvironmentVariables().entrySet()) {
            environmentBuilder.addVariables(Utils.variable((String) entry2.getKey(), (String) entry2.getValue()));
        }
        configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, name.getTaskId().getValue());
        sb.append(" ").append(ProcessMemoryUtils.generateJvmParametersStr(containeredParameters.getTaskExecutorProcessSpec()));
        sb.append(' ').append(ContainerSpecification.formatSystemProperties(this.containerSpec.getSystemProperties()));
        environmentBuilder.addVariables(Utils.variable(MesosConfigKeys.ENV_JVM_ARGS, sb.toString()));
        environmentBuilder.addVariables(Utils.variable(MesosConfigKeys.ENV_TASK_NAME, name.getTaskId().getValue()));
        environmentBuilder.addVariables(Utils.variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, this.mesosConfiguration.frameworkInfo().getName()));
        StringBuilder sb2 = new StringBuilder();
        if (this.params.bootstrapCommand().isDefined()) {
            sb2.append((String) this.params.bootstrapCommand().get()).append(" && ");
        }
        sb2.append(this.params.command()).append(" ").append(ContainerSpecification.formatSystemProperties(configuration)).append(" ").append(TaskExecutorProcessUtils.generateDynamicConfigsStr(containeredParameters.getTaskExecutorProcessSpec()));
        commandBuilder.setValue(sb2.toString());
        Protos.ContainerInfo.Builder newBuilder = Protos.ContainerInfo.newBuilder();
        newBuilder.setType(Protos.ContainerInfo.Type.MESOS);
        switch (this.params.containerType()) {
            case MESOS:
                if (this.params.containerImageName().isDefined()) {
                    newBuilder.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder().setImage(Protos.Image.newBuilder().setType(Protos.Image.Type.DOCKER).setDocker(Protos.Image.Docker.newBuilder().setName((String) this.params.containerImageName().get()))));
                    break;
                }
                break;
            case DOCKER:
                if (!$assertionsDisabled && !this.params.containerImageName().isDefined()) {
                    throw new AssertionError();
                }
                newBuilder.setType(Protos.ContainerInfo.Type.DOCKER).setDocker(Protos.ContainerInfo.DockerInfo.newBuilder().addAllParameters(this.params.dockerParameters()).setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST).setImage((String) this.params.containerImageName().get()).setForcePullImage(this.params.dockerForcePullImage()));
                break;
                break;
            default:
                throw new IllegalStateException("unsupported container type");
        }
        newBuilder.addAllVolumes(this.params.containerVolumes());
        name.setContainer(newBuilder);
        LOG.info("Starting TaskExecutor {} with command: {}", slaveID, name.getCommand().getValue());
        return name.build();
    }

    static Set<String> extractPortKeys(Configuration configuration) {
        LinkedHashSet linkedHashSet = new LinkedHashSet(TM_PORT_KEYS);
        String string = configuration.getString(MesosOptions.PORT_ASSIGNMENTS);
        if (string != null) {
            Stream peek = Arrays.stream(string.split(",")).map((v0) -> {
                return v0.trim();
            }).peek(str -> {
                LOG.debug("Adding port key {} to mesos request", str);
            });
            linkedHashSet.getClass();
            peek.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return Collections.unmodifiableSet(linkedHashSet);
    }

    public String toString() {
        return "LaunchableMesosWorker{taskID=" + this.taskID + "taskRequest=" + this.taskRequest + '}';
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void configureArtifactServer(MesosArtifactServer mesosArtifactServer, ContainerSpecification containerSpecification) throws IOException {
        for (ContainerSpecification.Artifact artifact : containerSpecification.getArtifacts()) {
            mesosArtifactServer.addPath(artifact.source, artifact.dest);
        }
    }

    static {
        $assertionsDisabled = !LaunchableMesosWorker.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(LaunchableMesosWorker.class);
        TM_PORT_KEYS = Collections.unmodifiableSet(new HashSet(Arrays.asList("taskmanager.rpc.port", "taskmanager.data.port")));
    }
}
