/*
 * Decompiled with CFR 0.152.
 */
package org.apache.reef.runtime.common.driver.evaluator;

import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.evaluator.EvaluatorType;
import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.exception.EvaluatorException;
import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
import org.apache.reef.io.naming.Identifiable;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.DriverRestartCompleted;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
import org.apache.reef.runtime.common.driver.evaluator.CompletedEvaluatorImpl;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorControlHandler;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorHeartBeatSanityChecker;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorState;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorStatusManager;
import org.apache.reef.runtime.common.driver.evaluator.FailedEvaluatorImpl;
import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.ConfigurationProvider;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.time.Clock;
import org.apache.reef.wake.time.event.Alarm;

@Private
@DriverSide
public final class EvaluatorManager
implements Identifiable,
AutoCloseable {
    private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
    private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
    private final Clock clock;
    private final ResourceReleaseHandler resourceReleaseHandler;
    private final ResourceLaunchHandler resourceLaunchHandler;
    private final String evaluatorId;
    private final EvaluatorDescriptorImpl evaluatorDescriptor;
    private final ContextRepresenters contextRepresenters;
    private final EvaluatorMessageDispatcher messageDispatcher;
    private final EvaluatorControlHandler evaluatorControlHandler;
    private final ContextControlHandler contextControlHandler;
    private final EvaluatorStatusManager stateManager;
    private final ExceptionCodec exceptionCodec;
    private final DriverStatusManager driverStatusManager;
    private final EventHandlerIdlenessSource idlenessSource;
    private final LoggingScopeFactory loggingScopeFactory;
    private Optional<TaskRepresenter> task = Optional.empty();
    private boolean isResourceReleased = false;

    @Inject
    private EvaluatorManager(Clock clock, RemoteManager remoteManager, ResourceReleaseHandler resourceReleaseHandler, ResourceLaunchHandler resourceLaunchHandler, @Parameter(value=EvaluatorIdentifier.class) String evaluatorId, @Parameter(value=EvaluatorDescriptorName.class) EvaluatorDescriptorImpl evaluatorDescriptor, ContextRepresenters contextRepresenters, ConfigurationSerializer configurationSerializer, EvaluatorMessageDispatcher messageDispatcher, EvaluatorControlHandler evaluatorControlHandler, ContextControlHandler contextControlHandler, EvaluatorStatusManager stateManager, DriverStatusManager driverStatusManager, ExceptionCodec exceptionCodec, EventHandlerIdlenessSource idlenessSource, LoggingScopeFactory loggingScopeFactory, @Parameter(value=EvaluatorConfigurationProviders.class) Set<ConfigurationProvider> evaluatorConfigurationProviders) {
        this.contextRepresenters = contextRepresenters;
        this.idlenessSource = idlenessSource;
        LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
        this.clock = clock;
        this.resourceReleaseHandler = resourceReleaseHandler;
        this.resourceLaunchHandler = resourceLaunchHandler;
        this.evaluatorId = evaluatorId;
        this.evaluatorDescriptor = evaluatorDescriptor;
        this.messageDispatcher = messageDispatcher;
        this.evaluatorControlHandler = evaluatorControlHandler;
        this.contextControlHandler = contextControlHandler;
        this.stateManager = stateManager;
        this.driverStatusManager = driverStatusManager;
        this.exceptionCodec = exceptionCodec;
        this.loggingScopeFactory = loggingScopeFactory;
        AllocatedEvaluatorImpl allocatedEvaluator = new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, EvaluatorManager.getJobIdentifier(), loggingScopeFactory, evaluatorConfigurationProviders);
        LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
        this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
        LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
    }

    public static String getJobIdentifier() {
        for (File directory = new File(System.getProperty("user.dir")); directory != null; directory = directory.getParentFile()) {
            String currentDirectoryName = directory.getName();
            if (!currentDirectoryName.toLowerCase().contains("application_")) continue;
            return currentDirectoryName;
        }
        return "REEF_LOCAL_RUNTIME";
    }

    private static boolean isDoneOrFailedOrKilled(ResourceStatusEvent resourceStatusEvent) {
        return resourceStatusEvent.getState() == ReefServiceProtos.State.DONE || resourceStatusEvent.getState() == ReefServiceProtos.State.FAILED || resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED;
    }

    @Override
    public String getId() {
        return this.evaluatorId;
    }

    public void setType(EvaluatorType type) {
        this.evaluatorDescriptor.setType(type);
    }

    public EvaluatorDescriptor getEvaluatorDescriptor() {
        return this.evaluatorDescriptor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            if (this.stateManager.isRunning()) {
                LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", this.getId());
                try {
                    EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto = EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder().setTimestamp(System.currentTimeMillis()).setIdentifier(this.getId()).setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build()).build();
                    this.sendEvaluatorControlMessage(evaluatorControlProto);
                }
                finally {
                    this.stateManager.setKilled();
                }
            }
            if (!this.isResourceReleased) {
                this.isResourceReleased = true;
                try {
                    this.clock.scheduleAlarm(100, (EventHandler)new EventHandler<Alarm>(){

                        public void onNext(Alarm alarm) {
                            EvaluatorManager.this.resourceReleaseHandler.onNext(ResourceReleaseEventImpl.newBuilder().setIdentifier(EvaluatorManager.this.evaluatorId).build());
                        }
                    });
                }
                catch (IllegalStateException e) {
                    LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
                    this.resourceReleaseHandler.onNext(ResourceReleaseEventImpl.newBuilder().setIdentifier(this.evaluatorId).build());
                }
            }
        }
        this.idlenessSource.check();
    }

    public boolean isClosed() {
        return this.messageDispatcher.isEmpty() && this.stateManager.isDoneOrFailedOrKilled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvaluatorException(EvaluatorException exception) {
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            if (this.stateManager.isDoneOrFailedOrKilled()) {
                LOG.log(Level.FINE, "Ignoring an exception receivedfor Evaluator {0} which is already in state {1}.", new Object[]{this.getId(), this.stateManager});
                return;
            }
            LOG.log(Level.WARNING, "Failed evaluator: " + this.getId(), exception);
            try {
                Optional failedTaskOptional;
                List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
                if (this.task.isPresent()) {
                    String taskId = ((TaskRepresenter)this.task.get()).getId();
                    Optional evaluatorContext = Optional.empty();
                    Optional bytes = Optional.empty();
                    Optional taskException = Optional.of((Object)new Exception("Evaluator crash"));
                    String message = "Evaluator crash";
                    Optional description = Optional.empty();
                    FailedTask failedTask = new FailedTask(taskId, "Evaluator crash", (Optional<String>)description, (Optional<Throwable>)taskException, (Optional<byte[]>)bytes, (Optional<ActiveContext>)evaluatorContext);
                    failedTaskOptional = Optional.of((Object)failedTask);
                } else {
                    failedTaskOptional = Optional.empty();
                }
                this.messageDispatcher.onEvaluatorFailed(new FailedEvaluatorImpl(exception, failedContextList, (Optional<FailedTask>)failedTaskOptional, this.evaluatorId));
            }
            catch (Exception e) {
                LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
            }
            finally {
                this.stateManager.setFailed();
                this.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvaluatorHeartbeatMessage(RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
        EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = (EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto)evaluatorHeartbeatProtoRemoteMessage.getMessage();
        LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            if (this.stateManager.isDoneOrFailedOrKilled()) {
                LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.", new Object[]{this.getId(), this.stateManager});
                return;
            }
            this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp());
            String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
            if (evaluatorHeartbeatProto.getRecovery()) {
                this.evaluatorControlHandler.setRemoteID(evaluatorRID);
                this.stateManager.setRunning();
                this.driverStatusManager.oneContainerRecovered();
                int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers();
                LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId);
                int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers();
                if (numRecoveredContainers > expectedEvaluatorsNumber) {
                    LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.", new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
                    throw new RuntimeException("More then expected number of evaluators are checking in during recovery.");
                }
                if (numRecoveredContainers == expectedEvaluatorsNumber) {
                    LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber);
                    this.driverStatusManager.setRestartCompleted();
                    this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis()));
                } else {
                    LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.", new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
                }
            }
            if (this.stateManager.isSubmitted()) {
                this.evaluatorControlHandler.setRemoteID(evaluatorRID);
                this.stateManager.setRunning();
                LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
            }
            if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
                this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
            }
            boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
            this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(), informClientOfNewContexts);
            if (evaluatorHeartbeatProto.hasTaskStatus()) {
                this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
            }
            LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
        }
    }

    private synchronized void onEvaluatorStatusMessage(ReefServiceProtos.EvaluatorStatusProto message) {
        switch (message.getState()) {
            case DONE: {
                this.onEvaluatorDone(message);
                break;
            }
            case FAILED: {
                this.onEvaluatorFailed(message);
                break;
            }
        }
    }

    private synchronized void onEvaluatorDone(ReefServiceProtos.EvaluatorStatusProto message) {
        assert (message.getState() == ReefServiceProtos.State.DONE);
        LOG.log(Level.FINEST, "Evaluator {0} done.", this.getId());
        this.stateManager.setDone();
        this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
        this.close();
    }

    private synchronized void onEvaluatorFailed(ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
        Optional<Throwable> exception;
        assert (evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED);
        EvaluatorException evaluatorException = evaluatorStatusProto.hasError() ? ((exception = this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray())).isPresent() ? new EvaluatorException(this.getId(), (Throwable)exception.get()) : new EvaluatorException(this.getId(), new Exception("Exception sent, but can't be deserialized"))) : new EvaluatorException(this.getId(), new Exception("No exception sent"));
        this.onEvaluatorException(evaluatorException);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onResourceLaunch(ResourceLaunchEvent resourceLaunchEvent) {
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            if (!this.stateManager.isAllocated()) {
                throw new RuntimeException("Evaluator manager expected " + (Object)((Object)EvaluatorState.ALLOCATED) + " state but instead is in state " + this.stateManager);
            }
            this.stateManager.setSubmitted();
            this.resourceLaunchHandler.onNext(resourceLaunchEvent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendContextControlMessage(EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
            this.contextControlHandler.send(contextControlProto);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendEvaluatorControlMessage(EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            this.evaluatorControlHandler.send(evaluatorControlProto);
        }
    }

    private void onTaskStatusMessage(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (!this.task.isPresent() || !((TaskRepresenter)this.task.get()).getId().equals(taskStatusProto.getTaskId())) {
            if (taskStatusProto.getState() == ReefServiceProtos.State.INIT || taskStatusProto.getState() == ReefServiceProtos.State.FAILED || taskStatusProto.getRecovery()) {
                this.task = Optional.of((Object)new TaskRepresenter(taskStatusProto.getTaskId(), this.contextRepresenters.getContext(taskStatusProto.getContextId()), this.messageDispatcher, this, this.exceptionCodec));
            } else {
                throw new RuntimeException("Received an message of state " + (Object)((Object)taskStatusProto.getState()) + ", not INIT or FAILED for Task " + taskStatusProto.getTaskId() + " which we haven't heard from before.");
            }
        }
        ((TaskRepresenter)this.task.get()).onTaskStatusMessage(taskStatusProto);
        if (((TaskRepresenter)this.task.get()).isNotRunning()) {
            LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
            this.task = Optional.empty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onResourceStatusMessage(ResourceStatusEvent resourceStatusEvent) {
        EvaluatorDescriptorImpl evaluatorDescriptorImpl = this.evaluatorDescriptor;
        synchronized (evaluatorDescriptorImpl) {
            LOG.log(Level.FINEST, "Resource manager state update: {0}", (Object)resourceStatusEvent.getState());
            if (this.stateManager.isDoneOrFailedOrKilled()) {
                LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.", new Object[]{this.getId(), this.stateManager});
            } else if (EvaluatorManager.isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
                StringBuilder messageBuilder = new StringBuilder("Evaluator [").append(this.evaluatorId).append("] is assumed to be in state [").append(this.stateManager.toString()).append("]. But the resource manager reports it to be in state [").append((Object)resourceStatusEvent.getState()).append("].");
                if (this.stateManager.isSubmitted()) {
                    messageBuilder.append(" This most likely means that the Evaluator suffered a failure before establishing a communications link to the driver.");
                } else if (this.stateManager.isAllocated()) {
                    messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
                } else if (this.stateManager.isRunning()) {
                    messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message back to the driver.");
                }
                if (this.task.isPresent()) {
                    messageBuilder.append(" Task [").append(((TaskRepresenter)this.task.get()).getId()).append("] was running when the Evaluator crashed.");
                }
                this.isResourceReleased = true;
                if (resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED) {
                    this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
                } else {
                    this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
                }
            }
        }
    }

    public String toString() {
        return "EvaluatorManager: id=" + this.evaluatorId + " state=" + this.stateManager + " task=" + this.task;
    }

    @NamedParameter(doc="The Evaluator Host.")
    public static final class EvaluatorDescriptorName
    implements Name<EvaluatorDescriptorImpl> {
    }

    @NamedParameter(doc="The Evaluator Identifier.")
    public static final class EvaluatorIdentifier
    implements Name<String> {
    }
}

