package org.apache.reef.runtime.common.evaluator;

import com.google.protobuf.ByteString;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.EvaluatorSide;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
import org.apache.reef.runtime.common.evaluator.context.ContextManager;
import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
import org.apache.reef.runtime.common.evaluator.parameters.EvaluatorIdentifier;
import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.Clock;
import org.apache.reef.wake.time.runtime.event.RuntimeStart;
import org.apache.reef.wake.time.runtime.event.RuntimeStop;

/* JADX INFO: Access modifiers changed from: package-private */
@Unit
@EvaluatorSide
/* loaded from: input_file:org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.class */
public final class EvaluatorRuntime implements EventHandler<EvaluatorRuntimeProtocol.EvaluatorControlProto> {
    private static final Logger LOG = Logger.getLogger(EvaluatorRuntime.class.getName());
    private final HeartBeatManager heartBeatManager;
    private final ContextManager contextManager;
    private final Clock clock;
    private final String evaluatorIdentifier;
    private final ExceptionCodec exceptionCodec;
    private final AutoCloseable evaluatorControlChannel;
    private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;

    /* loaded from: input_file:org/apache/reef/runtime/common/evaluator/EvaluatorRuntime$RuntimeStartHandler.class */
    final class RuntimeStartHandler implements EventHandler<RuntimeStart> {
        static final /* synthetic */ boolean $assertionsDisabled;

        RuntimeStartHandler() {
        }

        public final void onNext(RuntimeStart runtimeStart) {
            synchronized (EvaluatorRuntime.this.heartBeatManager) {
                try {
                    EvaluatorRuntime.LOG.log(Level.FINEST, "runtime start");
                } catch (Throwable th) {
                    EvaluatorRuntime.this.onException(th);
                }
                if (!$assertionsDisabled && ReefServiceProtos.State.INIT != EvaluatorRuntime.this.state) {
                    throw new AssertionError();
                }
                EvaluatorRuntime.this.state = ReefServiceProtos.State.RUNNING;
                EvaluatorRuntime.this.contextManager.start();
                EvaluatorRuntime.this.heartBeatManager.sendHeartbeat();
            }
        }

        static {
            $assertionsDisabled = !EvaluatorRuntime.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/reef/runtime/common/evaluator/EvaluatorRuntime$RuntimeStopHandler.class */
    final class RuntimeStopHandler implements EventHandler<RuntimeStop> {
        RuntimeStopHandler() {
        }

        public final void onNext(RuntimeStop runtimeStop) {
            synchronized (EvaluatorRuntime.this.heartBeatManager) {
                EvaluatorRuntime.LOG.log(Level.FINEST, "EvaluatorRuntime shutdown invoked for Evaluator {0} in state {1}", new Object[]{EvaluatorRuntime.this.evaluatorIdentifier, EvaluatorRuntime.this.state});
                if (EvaluatorRuntime.this.isRunning()) {
                    EvaluatorRuntime.this.onException(new RuntimeException("RuntimeStopHandler invoked in state RUNNING.", runtimeStop.getException()));
                } else {
                    EvaluatorRuntime.this.contextManager.close();
                    try {
                        EvaluatorRuntime.this.evaluatorControlChannel.close();
                    } catch (Exception e) {
                        EvaluatorRuntime.LOG.log(Level.SEVERE, "Exception during shutdown of evaluatorControlChannel.", (Throwable) e);
                    }
                    EvaluatorRuntime.LOG.log(Level.FINEST, "EvaluatorRuntime shutdown complete");
                }
            }
        }
    }

    @Inject
    private EvaluatorRuntime(@Parameter(HeartbeatPeriod.class) int i, @Parameter(EvaluatorIdentifier.class) String str, @Parameter(DriverRemoteIdentifier.class) String str2, HeartBeatManager.HeartbeatAlarmHandler heartbeatAlarmHandler, HeartBeatManager heartBeatManager, Clock clock, ContextManager contextManager, RemoteManager remoteManager, ExceptionCodec exceptionCodec) {
        this.heartBeatManager = heartBeatManager;
        this.contextManager = contextManager;
        this.clock = clock;
        this.evaluatorIdentifier = str;
        this.exceptionCodec = exceptionCodec;
        this.evaluatorControlChannel = remoteManager.registerHandler(str2, EvaluatorRuntimeProtocol.EvaluatorControlProto.class, this);
        clock.scheduleAlarm(i, heartbeatAlarmHandler);
    }

    private void onEvaluatorControlMessage(EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
        synchronized (this.heartBeatManager) {
            LOG.log(Level.FINEST, "Evaluator control message");
            if (!evaluatorControlProto.getIdentifier().equals(this.evaluatorIdentifier)) {
                onException(new RuntimeException("Identifier mismatch: message for evaluator id[" + evaluatorControlProto.getIdentifier() + "] sent to evaluator id[" + this.evaluatorIdentifier + "]"));
            } else if (ReefServiceProtos.State.RUNNING != this.state) {
                onException(new RuntimeException("Evaluator sent a control message but its state is not " + ReefServiceProtos.State.RUNNING + " but rather " + this.state));
            } else {
                if (evaluatorControlProto.hasContextControl()) {
                    LOG.log(Level.FINEST, "Send task control message to ContextManager");
                    try {
                        this.contextManager.handleContextControlProtocol(evaluatorControlProto.getContextControl());
                        if (this.contextManager.contextStackIsEmpty() && this.state == ReefServiceProtos.State.RUNNING) {
                            this.state = ReefServiceProtos.State.DONE;
                            this.heartBeatManager.sendEvaluatorStatus(getEvaluatorStatus());
                            this.clock.close();
                        }
                    } catch (Throwable th) {
                        onException(th);
                        throw new RuntimeException(th);
                    }
                }
                if (evaluatorControlProto.hasKillEvaluator()) {
                    LOG.log(Level.SEVERE, "Evaluator {0} has been killed by the driver.", this.evaluatorIdentifier);
                    this.state = ReefServiceProtos.State.KILLED;
                    this.clock.close();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onException(Throwable th) {
        synchronized (this.heartBeatManager) {
            this.state = ReefServiceProtos.State.FAILED;
            this.heartBeatManager.sendEvaluatorStatus(ReefServiceProtos.EvaluatorStatusProto.newBuilder().setEvaluatorId(this.evaluatorIdentifier).setError(ByteString.copyFrom(this.exceptionCodec.toBytes(th))).setState(this.state).m794build());
            this.contextManager.close();
        }
    }

    public ReefServiceProtos.EvaluatorStatusProto getEvaluatorStatus() {
        ReefServiceProtos.EvaluatorStatusProto m794build;
        synchronized (this.heartBeatManager) {
            LOG.log(Level.FINEST, "Evaluator heartbeat: state = {0}", this.state);
            m794build = ReefServiceProtos.EvaluatorStatusProto.newBuilder().setEvaluatorId(this.evaluatorIdentifier).setState(this.state).m794build();
        }
        return m794build;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final ReefServiceProtos.State getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRunning() {
        return this.state == ReefServiceProtos.State.RUNNING;
    }

    public void onNext(EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
        onEvaluatorControlMessage(evaluatorControlProto);
    }
}
