package org.apache.nemo.runtime.master.resource;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import org.apache.nemo.common.exception.ContainerException;
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.message.FailedMessageSender;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageSender;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DriverSide
@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/master/resource/ContainerManager.class */
public final class ContainerManager {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class.getName());
    private final EvaluatorRequestor evaluatorRequestor;
    private final MessageEnvironment messageEnvironment;
    private final ExecutorService serializationExecutorService;
    private boolean isTerminated = false;
    private final Map<String, ResourceSpecification> pendingContextIdToResourceSpec = new HashMap();
    private final Map<String, List<ResourceSpecification>> pendingContainerRequestsByContainerType = new HashMap();
    private final Map<String, ResourceSpecification> evaluatorIdToResourceSpec = new HashMap();
    private final Map<String, CountDownLatch> requestLatchByResourceSpecId = new HashMap();

    @Inject
    private ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) int i, EvaluatorRequestor evaluatorRequestor, MessageEnvironment messageEnvironment) {
        this.evaluatorRequestor = evaluatorRequestor;
        this.messageEnvironment = messageEnvironment;
        this.serializationExecutorService = Executors.newFixedThreadPool(i);
    }

    public void requestContainer(int i, ResourceSpecification resourceSpecification) {
        if (this.isTerminated) {
            LOG.info("ContainerManager is terminated, ignoring {}", resourceSpecification.toString());
            return;
        }
        if (i <= 0) {
            LOG.info("Request {} containers", Integer.valueOf(i));
            return;
        }
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(resourceSpecification);
        }
        this.pendingContainerRequestsByContainerType.putIfAbsent(resourceSpecification.getContainerType(), new ArrayList());
        this.pendingContainerRequestsByContainerType.get(resourceSpecification.getContainerType()).addAll(arrayList);
        this.requestLatchByResourceSpecId.put(resourceSpecification.getResourceSpecId(), new CountDownLatch(i));
        this.evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setNumber(i).setMemory(resourceSpecification.getMemory()).setNumberOfCores(resourceSpecification.getCapacity()).build());
    }

    public void onContainerAllocated(String str, AllocatedEvaluator allocatedEvaluator, Configuration configuration) {
        if (this.isTerminated) {
            LOG.info("ContainerManager is terminated, closing {}", allocatedEvaluator.getId());
            allocatedEvaluator.close();
            return;
        }
        ResourceSpecification selectResourceSpecForContainer = selectResourceSpecForContainer();
        ArrayList arrayList = new ArrayList();
        this.evaluatorIdToResourceSpec.put(allocatedEvaluator.getId(), selectResourceSpecForContainer);
        LOG.info("Container type (" + selectResourceSpecForContainer.getContainerType() + ") allocated, will be used for [" + str + "]");
        this.pendingContextIdToResourceSpec.put(str, selectResourceSpecForContainer);
        arrayList.add(configuration);
        arrayList.add(Tang.Factory.getTang().newConfigurationBuilder().bindNamedParameter(JobConf.ExecutorMemoryMb.class, String.valueOf(selectResourceSpecForContainer.getMemory())).build());
        selectResourceSpecForContainer.getMaxOffheapRatio().ifPresent(d -> {
            arrayList.add(Tang.Factory.getTang().newConfigurationBuilder().bindNamedParameter(JobConf.MaxOffheapRatio.class, String.valueOf(d)).build());
        });
        selectResourceSpecForContainer.getPoisonSec().ifPresent(i -> {
            arrayList.add(Tang.Factory.getTang().newConfigurationBuilder().bindNamedParameter(JobConf.ExecutorPoisonSec.class, String.valueOf(i)).build());
        });
        allocatedEvaluator.submitContext(Configurations.merge(arrayList));
    }

    public Optional<ExecutorRepresenter> onContainerLaunched(ActiveContext activeContext) {
        MessageSender failedMessageSender;
        if (this.isTerminated) {
            LOG.info("ContainerManager is terminated, closing {}", activeContext.getId());
            activeContext.close();
            return Optional.empty();
        }
        String id = activeContext.getId();
        ResourceSpecification remove = this.pendingContextIdToResourceSpec.remove(id);
        try {
            failedMessageSender = (MessageSender) this.messageEnvironment.asyncConnect(id, "EXECUTOR_MESSAGE_LISTENER_ID").get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            failedMessageSender = new FailedMessageSender();
        } catch (ExecutionException e2) {
            failedMessageSender = new FailedMessageSender();
        }
        DefaultExecutorRepresenter defaultExecutorRepresenter = new DefaultExecutorRepresenter(id, remove, failedMessageSender, activeContext, this.serializationExecutorService, activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
        this.requestLatchByResourceSpecId.get(remove.getResourceSpecId()).countDown();
        return Optional.of(defaultExecutorRepresenter);
    }

    public ResourceSpecification onContainerFailed(String str) {
        ResourceSpecification remove = this.evaluatorIdToResourceSpec.remove(str);
        if (remove == null) {
            throw new IllegalStateException(str + " not in " + this.evaluatorIdToResourceSpec);
        }
        requestContainer(1, remove);
        return remove;
    }

    public void terminate() {
        if (this.isTerminated) {
            throw new IllegalStateException("Cannot terminate twice");
        }
        this.isTerminated = true;
    }

    private ResourceSpecification selectResourceSpecForContainer() {
        ResourceSpecification resourceSpecification = null;
        Iterator<Map.Entry<String, List<ResourceSpecification>>> it = this.pendingContainerRequestsByContainerType.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, List<ResourceSpecification>> next = it.next();
            if (next.getValue().size() > 0) {
                resourceSpecification = next.getValue().remove(0);
                break;
            }
        }
        if (resourceSpecification != null) {
            return resourceSpecification;
        }
        throw new ContainerException(new Throwable("We never requested for an extra container"));
    }
}
