package org.apache.reef.runtime.common.client;

import com.google.protobuf.ByteString;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.CompletedJob;
import org.apache.reef.client.FailedJob;
import org.apache.reef.client.JobMessage;
import org.apache.reef.client.RunningJob;
import org.apache.reef.client.parameters.JobCompletedHandler;
import org.apache.reef.client.parameters.JobFailedHandler;
import org.apache.reef.client.parameters.JobMessageHandler;
import org.apache.reef.client.parameters.JobRunningHandler;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.proto.ClientRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.client.REEFImplementation;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;

@ClientSide
@Private
/* loaded from: input_file:org/apache/reef/runtime/common/client/RunningJobImpl.class */
public final class RunningJobImpl implements RunningJob, EventHandler<ReefServiceProtos.JobStatusProto> {
    private static final Logger LOG;
    private final String jobId;
    private final EventHandler<ClientRuntimeProtocol.JobControlProto> jobControlHandler;
    private final EventHandler<RunningJob> runningJobEventHandler;
    private final EventHandler<CompletedJob> completedJobEventHandler;
    private final EventHandler<FailedJob> failedJobEventHandler;
    private final EventHandler<JobMessage> jobMessageEventHandler;
    private final ExceptionCodec exceptionCodec;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Inject
    RunningJobImpl(RemoteManager remoteManager, @Parameter(DriverIdentifier.class) String str, @Parameter(REEFImplementation.DriverRemoteIdentifier.class) String str2, @Parameter(JobRunningHandler.class) EventHandler<RunningJob> eventHandler, @Parameter(JobCompletedHandler.class) EventHandler<CompletedJob> eventHandler2, @Parameter(JobFailedHandler.class) EventHandler<FailedJob> eventHandler3, @Parameter(JobMessageHandler.class) EventHandler<JobMessage> eventHandler4, ExceptionCodec exceptionCodec) {
        this.jobId = str;
        this.runningJobEventHandler = eventHandler;
        this.completedJobEventHandler = eventHandler2;
        this.failedJobEventHandler = eventHandler3;
        this.jobMessageEventHandler = eventHandler4;
        this.exceptionCodec = exceptionCodec;
        this.jobControlHandler = remoteManager.getHandler(str2, ClientRuntimeProtocol.JobControlProto.class);
        this.runningJobEventHandler.onNext(this);
        LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
    }

    @Override // org.apache.reef.client.RunningJob, java.lang.AutoCloseable
    public synchronized void close() {
        this.jobControlHandler.onNext(ClientRuntimeProtocol.JobControlProto.newBuilder().setIdentifier(this.jobId).setSignal(ClientRuntimeProtocol.Signal.SIG_TERMINATE).build());
    }

    @Override // org.apache.reef.client.RunningJob
    public synchronized void close(byte[] bArr) {
        this.jobControlHandler.onNext(ClientRuntimeProtocol.JobControlProto.newBuilder().setIdentifier(this.jobId).setSignal(ClientRuntimeProtocol.Signal.SIG_TERMINATE).setMessage(ByteString.copyFrom(bArr)).build());
    }

    @Override // org.apache.reef.client.RunningJob, org.apache.reef.io.naming.Identifiable
    public String getId() {
        return this.jobId;
    }

    @Override // org.apache.reef.client.RunningJob
    public synchronized void send(byte[] bArr) {
        this.jobControlHandler.onNext(ClientRuntimeProtocol.JobControlProto.newBuilder().setIdentifier(this.jobId).setMessage(ByteString.copyFrom(bArr)).build());
    }

    @Override // org.apache.reef.wake.EventHandler
    public synchronized void onNext(ReefServiceProtos.JobStatusProto jobStatusProto) {
        ReefServiceProtos.State state = jobStatusProto.getState();
        LOG.log(Level.FINEST, "Received job status: {0} from {1}", new Object[]{state, jobStatusProto.getIdentifier()});
        if (jobStatusProto.hasMessage()) {
            this.jobMessageEventHandler.onNext(new JobMessage(getId(), jobStatusProto.getMessage().toByteArray()));
        }
        if (state == ReefServiceProtos.State.DONE) {
            this.completedJobEventHandler.onNext(new CompletedJobImpl(getId()));
        } else if (state == ReefServiceProtos.State.FAILED) {
            onJobFailure(jobStatusProto);
        }
    }

    private synchronized void onJobFailure(ReefServiceProtos.JobStatusProto jobStatusProto) {
        if (!$assertionsDisabled && jobStatusProto.getState() != ReefServiceProtos.State.FAILED) {
            throw new AssertionError();
        }
        String str = this.jobId;
        Optional<byte[]> of = jobStatusProto.hasException() ? Optional.of(jobStatusProto.getException().toByteArray()) : Optional.empty();
        Optional<Throwable> fromBytes = this.exceptionCodec.fromBytes(of);
        String message = fromBytes.isPresent() ? fromBytes.get().getMessage() : "No Message sent by the Job";
        this.failedJobEventHandler.onNext(new FailedJob(str, message, Optional.of(message), fromBytes, of));
    }

    public String toString() {
        return "RunningJob{'" + this.jobId + "'}";
    }

    static {
        $assertionsDisabled = !RunningJobImpl.class.desiredAssertionStatus();
        LOG = Logger.getLogger(RunningJob.class.getName());
    }
}
