package org.apache.reef.runtime.common.driver.task;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.util.Optional;

@DriverSide
@Private
/* loaded from: input_file:org/apache/reef/runtime/common/driver/task/TaskRepresenter.class */
public final class TaskRepresenter {
    private static final Logger LOG;
    private final EvaluatorContext context;
    private final EvaluatorMessageDispatcher messageDispatcher;
    private final EvaluatorManager evaluatorManager;
    private final ExceptionCodec exceptionCodec;
    private final String taskId;
    private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
    static final /* synthetic */ boolean $assertionsDisabled;

    public TaskRepresenter(String str, EvaluatorContext evaluatorContext, EvaluatorMessageDispatcher evaluatorMessageDispatcher, EvaluatorManager evaluatorManager, ExceptionCodec exceptionCodec) {
        this.taskId = str;
        this.context = evaluatorContext;
        this.messageDispatcher = evaluatorMessageDispatcher;
        this.evaluatorManager = evaluatorManager;
        this.exceptionCodec = exceptionCodec;
    }

    private static byte[] getResult(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (taskStatusProto.hasResult()) {
            return taskStatusProto.getResult().toByteArray();
        }
        return null;
    }

    public void onTaskStatusMessage(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        LOG.log(Level.FINE, "Received task {0} status {1}", new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()});
        if (!taskStatusProto.getContextId().equals(this.context.getId())) {
            throw new RuntimeException("Received a message for a task running on Context " + taskStatusProto.getContextId() + " while the Driver believes this Task to be run on Context " + this.context.getId());
        }
        if (!taskStatusProto.getTaskId().equals(this.taskId)) {
            throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() + " in the TaskRepresenter for Task " + this.taskId);
        }
        if (taskStatusProto.getRecovery()) {
            LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.", new Object[]{taskStatusProto.getState(), this.taskId});
            setState(taskStatusProto.getState());
        }
        switch (taskStatusProto.getState()) {
            case INIT:
                onTaskInit(taskStatusProto);
                return;
            case RUNNING:
                onTaskRunning(taskStatusProto);
                return;
            case SUSPEND:
                onTaskSuspend(taskStatusProto);
                return;
            case DONE:
                onTaskDone(taskStatusProto);
                return;
            case FAILED:
                onTaskFailed(taskStatusProto);
                return;
            default:
                throw new IllegalStateException("Unknown task state: " + taskStatusProto.getState());
        }
    }

    private void onTaskInit(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (!$assertionsDisabled && ReefServiceProtos.State.INIT != taskStatusProto.getState()) {
            throw new AssertionError();
        }
        if (isKnown()) {
            LOG.log(Level.WARNING, "Received a INIT message for task with id {0} which we have seen before. Ignoring the second message", this.taskId);
            return;
        }
        this.messageDispatcher.onTaskRunning(new RunningTaskImpl(this.evaluatorManager, this.taskId, this.context, this));
        setState(ReefServiceProtos.State.RUNNING);
    }

    private void onTaskRunning(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (!$assertionsDisabled && taskStatusProto.getState() != ReefServiceProtos.State.RUNNING) {
            throw new AssertionError();
        }
        if (isNotRunning()) {
            throw new IllegalStateException("Received a task status message from task " + this.taskId + " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state);
        }
        if (taskStatusProto.getRecovery()) {
            this.messageDispatcher.onDriverRestartTaskRunning(new RunningTaskImpl(this.evaluatorManager, this.taskId, this.context, this));
        }
        for (ReefServiceProtos.TaskStatusProto.TaskMessageProto taskMessageProto : taskStatusProto.getTaskMessageList()) {
            this.messageDispatcher.onTaskMessage(new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(), this.taskId, this.context.getId(), taskMessageProto.getSourceId()));
        }
    }

    private void onTaskSuspend(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (!$assertionsDisabled && ReefServiceProtos.State.SUSPEND != taskStatusProto.getState()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !isKnown()) {
            throw new AssertionError();
        }
        this.messageDispatcher.onTaskSuspended(new SuspendedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
        setState(ReefServiceProtos.State.SUSPEND);
    }

    private void onTaskDone(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (!$assertionsDisabled && ReefServiceProtos.State.DONE != taskStatusProto.getState()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !isKnown()) {
            throw new AssertionError();
        }
        this.messageDispatcher.onTaskCompleted(new CompletedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
        setState(ReefServiceProtos.State.DONE);
    }

    private void onTaskFailed(ReefServiceProtos.TaskStatusProto taskStatusProto) {
        if (!$assertionsDisabled && ReefServiceProtos.State.FAILED != taskStatusProto.getState()) {
            throw new AssertionError();
        }
        Optional of = Optional.of(this.context);
        Optional<byte[]> ofNullable = Optional.ofNullable(getResult(taskStatusProto));
        Optional<Throwable> fromBytes = this.exceptionCodec.fromBytes(ofNullable);
        this.messageDispatcher.onTaskFailed(new FailedTask(this.taskId, fromBytes.isPresent() ? ((Throwable) fromBytes.get()).getMessage() : "No message given", Optional.empty(), fromBytes, ofNullable, of));
        setState(ReefServiceProtos.State.FAILED);
    }

    public String getId() {
        return this.taskId;
    }

    private boolean isKnown() {
        return this.state != ReefServiceProtos.State.INIT;
    }

    public boolean isNotRunning() {
        return this.state != ReefServiceProtos.State.RUNNING;
    }

    private void setState(ReefServiceProtos.State state) {
        LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]", new Object[]{this.taskId, this.state, state});
        this.state = state;
    }

    static {
        $assertionsDisabled = !TaskRepresenter.class.desiredAssertionStatus();
        LOG = Logger.getLogger(TaskRepresenter.class.getName());
    }
}
