package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.Props;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.fenzo.functions.Action1;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.SchedulerProxy;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
import org.apache.flink.mesos.scheduler.Tasks;
import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import scala.Option;

/* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.class */
public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
    private final MesosConfiguration mesosConfig;
    private final MesosTaskManagerParameters taskManagerParameters;
    private final ContainerSpecification taskManagerContainerSpec;
    private final MesosArtifactResolver artifactResolver;
    private final int maxFailedTasks;
    private SchedulerProxy schedulerCallbackHandler;
    private SchedulerDriver schedulerDriver;
    private ActorRef connectionMonitor;
    private ActorRef taskRouter;
    private ActorRef launchCoordinator;
    private ActorRef reconciliationCoordinator;
    private final MesosWorkerStore workerStore;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    private int failedTasksSoFar;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MesosFlinkResourceManager(Configuration configuration, MesosConfiguration mesosConfiguration, MesosWorkerStore mesosWorkerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters mesosTaskManagerParameters, ContainerSpecification containerSpecification, MesosArtifactResolver mesosArtifactResolver, int i, int i2) {
        super(i2, configuration, leaderRetrievalService);
        this.mesosConfig = (MesosConfiguration) Objects.requireNonNull(mesosConfiguration);
        this.workerStore = (MesosWorkerStore) Objects.requireNonNull(mesosWorkerStore);
        this.artifactResolver = (MesosArtifactResolver) Objects.requireNonNull(mesosArtifactResolver);
        this.taskManagerParameters = (MesosTaskManagerParameters) Objects.requireNonNull(mesosTaskManagerParameters);
        this.taskManagerContainerSpec = (ContainerSpecification) Objects.requireNonNull(containerSpecification);
        this.maxFailedTasks = i;
        this.workersInNew = new HashMap();
        this.workersInLaunch = new HashMap();
        this.workersBeingReturned = new HashMap();
    }

    protected void initialize() throws Exception {
        this.LOG.info("Initializing Mesos resource master");
        this.workerStore.start();
        this.schedulerCallbackHandler = new SchedulerProxy(self());
        Protos.FrameworkInfo.Builder checkpoint = this.mesosConfig.frameworkInfo().mo117clone().setCheckpoint(true);
        Option<Protos.FrameworkID> frameworkID = this.workerStore.getFrameworkID();
        if (frameworkID.isEmpty()) {
            this.LOG.info("Registering as new framework.");
        } else {
            this.LOG.info("Recovery scenario: re-registering using framework ID {}.", ((Protos.FrameworkID) frameworkID.get()).getValue());
            checkpoint.setId((Protos.FrameworkID) frameworkID.get());
        }
        MesosConfiguration withFrameworkInfo = this.mesosConfig.withFrameworkInfo(checkpoint);
        MesosConfiguration.logMesosConfig(this.LOG, withFrameworkInfo);
        this.schedulerDriver = withFrameworkInfo.createDriver(this.schedulerCallbackHandler, false);
        this.connectionMonitor = createConnectionMonitor();
        this.launchCoordinator = createLaunchCoordinator();
        this.reconciliationCoordinator = createReconciliationCoordinator();
        this.taskRouter = createTaskRouter();
        recoverWorkers();
        this.connectionMonitor.tell(new ConnectionMonitor.Start(), self());
        this.schedulerDriver.start();
    }

    protected ActorRef createConnectionMonitor() {
        return context().actorOf(ConnectionMonitor.createActorProps(ConnectionMonitor.class, this.config), "connectionMonitor");
    }

    protected ActorRef createTaskRouter() {
        return context().actorOf(Tasks.createActorProps(Tasks.class, self(), this.config, this.schedulerDriver, TaskMonitor.class), "tasks");
    }

    protected ActorRef createLaunchCoordinator() {
        return context().actorOf(LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), this.config, this.schedulerDriver, createOptimizer()), "launchCoordinator");
    }

    protected ActorRef createReconciliationCoordinator() {
        return context().actorOf(ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, this.config, this.schedulerDriver), "reconciliationCoordinator");
    }

    public void postStop() {
        this.LOG.info("Stopping Mesos resource master");
        super.postStop();
    }

    protected void handleMessage(Object obj) {
        if (obj instanceof Registered) {
            registered((Registered) obj);
            return;
        }
        if (obj instanceof ReRegistered) {
            reregistered((ReRegistered) obj);
            return;
        }
        if (obj instanceof Disconnected) {
            disconnected((Disconnected) obj);
            return;
        }
        if (obj instanceof Error) {
            error(((Error) obj).message());
            return;
        }
        if ((obj instanceof ResourceOffers) || (obj instanceof OfferRescinded)) {
            this.launchCoordinator.tell(obj, self());
            return;
        }
        if (obj instanceof AcceptOffers) {
            acceptOffers((AcceptOffers) obj);
            return;
        }
        if (obj instanceof StatusUpdate) {
            taskStatusUpdated((StatusUpdate) obj);
            return;
        }
        if (obj instanceof ReconciliationCoordinator.Reconcile) {
            this.reconciliationCoordinator.tell(obj, self());
        } else if (!(obj instanceof TaskMonitor.TaskTerminated)) {
            super.handleMessage(obj);
        } else {
            TaskMonitor.TaskTerminated taskTerminated = (TaskMonitor.TaskTerminated) obj;
            taskTerminated(taskTerminated.taskID(), taskTerminated.status());
        }
    }

    protected void shutdownApplication(ApplicationStatus applicationStatus, String str) {
        this.LOG.info("Shutting down and unregistering as a Mesos framework.");
        try {
            this.schedulerDriver.stop(false);
        } catch (Exception e) {
            this.LOG.warn("unable to unregister the framework", e);
        }
        try {
            this.workerStore.stop(true);
        } catch (Exception e2) {
            this.LOG.warn("unable to stop the worker state store", e2);
        }
        context().stop(self());
    }

    protected void fatalError(String str, Throwable th) {
        this.LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + str, th);
        this.LOG.error("Shutting down process");
        System.exit(-13);
    }

    private void recoverWorkers() throws Exception {
        List<MesosWorkerStore.Worker> recoverWorkers = this.workerStore.recoverWorkers();
        if (recoverWorkers.isEmpty()) {
            return;
        }
        this.LOG.info("Retrieved {} TaskManagers from previous attempt", Integer.valueOf(recoverWorkers.size()));
        ArrayList arrayList = new ArrayList(recoverWorkers.size());
        ArrayList arrayList2 = new ArrayList(recoverWorkers.size());
        for (MesosWorkerStore.Worker worker : recoverWorkers) {
            LaunchableMesosWorker createLaunchableMesosWorker = createLaunchableMesosWorker(worker.taskID());
            switch (worker.state()) {
                case New:
                    this.workersInNew.put(extractResourceID(worker.taskID()), worker);
                    arrayList2.add(createLaunchableMesosWorker);
                    break;
                case Launched:
                    this.workersInLaunch.put(extractResourceID(worker.taskID()), worker);
                    arrayList.add(new Tuple2(createLaunchableMesosWorker.taskRequest(), worker.hostname().get()));
                    break;
                case Released:
                    this.workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
                    break;
            }
            this.taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
        }
        if (arrayList.size() >= 1) {
            this.launchCoordinator.tell(new LaunchCoordinator.Assign(arrayList), self());
        }
        if (arrayList2.size() >= 1) {
            this.launchCoordinator.tell(new LaunchCoordinator.Launch(arrayList2), self());
        }
    }

    protected void requestNewWorkers(int i) {
        try {
            ArrayList arrayList = new ArrayList(i);
            ArrayList arrayList2 = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                MesosWorkerStore.Worker newWorker = MesosWorkerStore.Worker.newWorker(this.workerStore.newTaskID());
                this.workerStore.putWorker(newWorker);
                this.workersInNew.put(extractResourceID(newWorker.taskID()), newWorker);
                LaunchableMesosWorker createLaunchableMesosWorker = createLaunchableMesosWorker(newWorker.taskID());
                this.LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", new Object[]{createLaunchableMesosWorker.taskID().getValue(), Double.valueOf(createLaunchableMesosWorker.taskRequest().getMemory()), Double.valueOf(createLaunchableMesosWorker.taskRequest().getCPUs())});
                arrayList.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(newWorker)));
                arrayList2.add(createLaunchableMesosWorker);
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.taskRouter.tell((TaskMonitor.TaskGoalStateUpdated) it.next(), self());
            }
            if (arrayList2.size() >= 1) {
                this.launchCoordinator.tell(new LaunchCoordinator.Launch(arrayList2), self());
            }
        } catch (Exception e) {
            fatalError("unable to request new workers", e);
        }
    }

    private void acceptOffers(AcceptOffers acceptOffers) {
        try {
            ArrayList arrayList = new ArrayList(acceptOffers.operations().size());
            for (Protos.Offer.Operation operation : acceptOffers.operations()) {
                if (operation.getType() == Protos.Offer.Operation.Type.LAUNCH) {
                    for (Protos.TaskInfo taskInfo : operation.getLaunch().getTaskInfosList()) {
                        MesosWorkerStore.Worker remove = this.workersInNew.remove(extractResourceID(taskInfo.getTaskId()));
                        if (!$assertionsDisabled && remove == null) {
                            throw new AssertionError();
                        }
                        MesosWorkerStore.Worker launchWorker = remove.launchWorker(taskInfo.getSlaveId(), acceptOffers.hostname());
                        this.workerStore.putWorker(launchWorker);
                        this.workersInLaunch.put(extractResourceID(launchWorker.taskID()), launchWorker);
                        this.LOG.info("Launching Mesos task {} on host {}.", launchWorker.taskID().getValue(), launchWorker.hostname().get());
                        arrayList.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(launchWorker)));
                    }
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.taskRouter.tell((TaskMonitor.TaskGoalStateUpdated) it.next(), self());
            }
            this.schedulerDriver.acceptOffers(acceptOffers.offerIds(), acceptOffers.operations(), acceptOffers.filters());
        } catch (Exception e) {
            fatalError("unable to accept offers", e);
        }
    }

    private void taskStatusUpdated(StatusUpdate statusUpdate) {
        this.taskRouter.tell(statusUpdate, self());
        this.reconciliationCoordinator.tell(statusUpdate, self());
        this.schedulerDriver.acknowledgeStatusUpdate(statusUpdate.status());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: workerStarted, reason: merged with bridge method [inline-methods] */
    public RegisteredMesosWorkerNode m22workerStarted(ResourceID resourceID) {
        MesosWorkerStore.Worker remove = this.workersInLaunch.remove(resourceID);
        if (remove == null) {
            return null;
        }
        return new RegisteredMesosWorkerNode(remove);
    }

    protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        for (ResourceID resourceID : collection) {
            MesosWorkerStore.Worker remove = this.workersInLaunch.remove(resourceID);
            if (remove != null) {
                this.LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
                arrayList.add(new RegisteredMesosWorkerNode(remove));
            } else if (isStarted(resourceID)) {
                this.LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
            } else {
                this.LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", resourceID);
            }
        }
        return arrayList;
    }

    protected void releasePendingWorker(ResourceID resourceID) {
        MesosWorkerStore.Worker remove = this.workersInLaunch.remove(resourceID);
        if (remove != null) {
            releaseWorker(remove);
        } else {
            this.LOG.error("Cannot find worker {} to release. Ignoring request.", resourceID);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseStartedWorker(RegisteredMesosWorkerNode registeredMesosWorkerNode) {
        releaseWorker(registeredMesosWorkerNode.getWorker());
    }

    private void releaseWorker(MesosWorkerStore.Worker worker) {
        try {
            this.LOG.info("Releasing worker {}", worker.taskID());
            MesosWorkerStore.Worker releaseWorker = worker.releaseWorker();
            this.workerStore.putWorker(releaseWorker);
            this.workersBeingReturned.put(extractResourceID(releaseWorker.taskID()), releaseWorker);
            this.taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(releaseWorker)), self());
            if (releaseWorker.hostname().isDefined()) {
                this.launchCoordinator.tell(new LaunchCoordinator.Unassign(releaseWorker.taskID(), (String) releaseWorker.hostname().get()), self());
            }
        } catch (Exception e) {
            fatalError("unable to release worker", e);
        }
    }

    protected int getNumWorkerRequestsPending() {
        return this.workersInNew.size();
    }

    protected int getNumWorkersPendingRegistration() {
        return this.workersInLaunch.size();
    }

    private void registered(Registered registered) {
        this.connectionMonitor.tell(registered, self());
        try {
            this.workerStore.setFrameworkID(Option.apply(registered.frameworkId()));
            this.launchCoordinator.tell(registered, self());
            this.reconciliationCoordinator.tell(registered, self());
            this.taskRouter.tell(registered, self());
        } catch (Exception e) {
            fatalError("unable to store the assigned framework ID", e);
        }
    }

    private void reregistered(ReRegistered reRegistered) {
        this.connectionMonitor.tell(reRegistered, self());
        this.launchCoordinator.tell(reRegistered, self());
        this.reconciliationCoordinator.tell(reRegistered, self());
        this.taskRouter.tell(reRegistered, self());
    }

    private void disconnected(Disconnected disconnected) {
        this.connectionMonitor.tell(disconnected, self());
        this.launchCoordinator.tell(disconnected, self());
        this.reconciliationCoordinator.tell(disconnected, self());
        this.taskRouter.tell(disconnected, self());
    }

    private void error(String str) {
        self().tell(new FatalErrorOccurred("Connection to Mesos failed", new Exception(str)), self());
    }

    private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus taskStatus) {
        ResourceID extractResourceID = extractResourceID(taskID);
        try {
            if (!this.workerStore.removeWorker(taskID)) {
                this.LOG.info("Received a termination notice for an unrecognized worker: {}", extractResourceID);
                return;
            }
            if (this.workersBeingReturned.remove(extractResourceID) != null) {
                this.LOG.info("Worker {} finished successfully with diagnostics: {}", extractResourceID, taskStatus.getMessage());
            } else {
                if (this.workersInLaunch.remove(extractResourceID) != null) {
                    this.LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. State: {} Reason: {} ({})", new Object[]{extractResourceID, taskStatus.getState(), taskStatus.getReason(), taskStatus.getMessage()});
                } else {
                    this.LOG.info("Mesos task {} failed, with a registered TaskManager. State: {} Reason: {} ({})", new Object[]{extractResourceID, taskStatus.getState(), taskStatus.getReason(), taskStatus.getMessage()});
                    notifyWorkerFailed(extractResourceID, "Mesos task " + extractResourceID + " failed.  State: " + taskStatus.getState());
                }
                this.failedTasksSoFar++;
                String format = String.format("Diagnostics for task %s in state %s : reason=%s message=%s", extractResourceID, taskStatus.getState(), taskStatus.getReason(), taskStatus.getMessage());
                sendInfoMessage(format);
                this.LOG.info(format);
                this.LOG.info("Total number of failed tasks so far: {}", Integer.valueOf(this.failedTasksSoFar));
                if (this.maxFailedTasks >= 0 && this.failedTasksSoFar > this.maxFailedTasks) {
                    String str = "Stopping Mesos session because the number of failed tasks (" + this.failedTasksSoFar + ") exceeded the maximum failed tasks (" + this.maxFailedTasks + "). This number is controlled by the '" + MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. By default its the number of requested tasks.";
                    this.LOG.error(str);
                    self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, str)), ActorRef.noSender());
                    return;
                }
            }
            triggerCheckWorkers();
        } catch (Exception e) {
            fatalError("unable to remove worker", e);
        }
    }

    private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
        return new LaunchableMesosWorker(this.artifactResolver, this.taskManagerParameters, this.taskManagerContainerSpec, taskID, this.mesosConfig);
    }

    static ResourceID extractResourceID(Protos.TaskID taskID) {
        return new ResourceID(taskID.getValue());
    }

    static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
        switch (worker.state()) {
            case New:
                return new TaskMonitor.New(worker.taskID());
            case Launched:
                return new TaskMonitor.Launched(worker.taskID(), (Protos.SlaveID) worker.slaveID().get());
            case Released:
                return new TaskMonitor.Released(worker.taskID(), (Protos.SlaveID) worker.slaveID().get());
            default:
                throw new IllegalArgumentException("unsupported worker state");
        }
    }

    private static TaskSchedulerBuilder createOptimizer() {
        return new TaskSchedulerBuilder() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.1
            TaskScheduler.Builder builder = new TaskScheduler.Builder();

            @Override // org.apache.flink.mesos.scheduler.TaskSchedulerBuilder
            public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action1) {
                this.builder.withLeaseRejectAction(action1);
                return this;
            }

            @Override // org.apache.flink.mesos.scheduler.TaskSchedulerBuilder
            public TaskScheduler build() {
                return this.builder.build();
            }
        };
    }

    public static Props createActorProps(Class<? extends MesosFlinkResourceManager> cls, Configuration configuration, MesosConfiguration mesosConfiguration, MesosWorkerStore mesosWorkerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters mesosTaskManagerParameters, ContainerSpecification containerSpecification, MesosArtifactResolver mesosArtifactResolver, Logger logger) {
        int integer = configuration.getInteger(MesosOptions.INITIAL_TASKS);
        if (integer < 0) {
            throw new IllegalConfigurationException("Invalid value for " + MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
        }
        logger.info("Mesos framework to allocate {} initial tasks", Integer.valueOf(integer));
        int integer2 = configuration.getInteger(MesosOptions.MAX_FAILED_TASKS.key(), integer);
        if (integer2 >= 0) {
            logger.info("Mesos framework tolerates {} failed tasks before giving up", Integer.valueOf(integer2));
        }
        return Props.create(cls, new Object[]{configuration, mesosConfiguration, mesosWorkerStore, leaderRetrievalService, mesosTaskManagerParameters, containerSpecification, mesosArtifactResolver, Integer.valueOf(integer2), Integer.valueOf(integer)});
    }

    static {
        $assertionsDisabled = !MesosFlinkResourceManager.class.desiredAssertionStatus();
    }
}
