/*
 * Decompiled with CFR 0.152.
 */
package org.apache.reef.runtime.common.client;

import com.google.protobuf.ByteString;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.CompletedJob;
import org.apache.reef.client.FailedJob;
import org.apache.reef.client.JobMessage;
import org.apache.reef.client.RunningJob;
import org.apache.reef.client.parameters.JobCompletedHandler;
import org.apache.reef.client.parameters.JobFailedHandler;
import org.apache.reef.client.parameters.JobMessageHandler;
import org.apache.reef.client.parameters.JobRunningHandler;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.proto.ClientRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.client.CompletedJobImpl;
import org.apache.reef.runtime.common.client.REEFImplementation;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;

@ClientSide
@Private
public final class RunningJobImpl
implements RunningJob,
EventHandler<ReefServiceProtos.JobStatusProto> {
    private static final Logger LOG = Logger.getLogger(RunningJob.class.getName());
    private final String jobId;
    private final EventHandler<ClientRuntimeProtocol.JobControlProto> jobControlHandler;
    private final EventHandler<RunningJob> runningJobEventHandler;
    private final EventHandler<CompletedJob> completedJobEventHandler;
    private final EventHandler<FailedJob> failedJobEventHandler;
    private final EventHandler<JobMessage> jobMessageEventHandler;
    private final ExceptionCodec exceptionCodec;

    @Inject
    RunningJobImpl(RemoteManager remoteManager, @Parameter(value=DriverIdentifier.class) String driverIdentifier, @Parameter(value=REEFImplementation.DriverRemoteIdentifier.class) String driverRID, @Parameter(value=JobRunningHandler.class) EventHandler<RunningJob> runningJobEventHandler, @Parameter(value=JobCompletedHandler.class) EventHandler<CompletedJob> completedJobEventHandler, @Parameter(value=JobFailedHandler.class) EventHandler<FailedJob> failedJobEventHandler, @Parameter(value=JobMessageHandler.class) EventHandler<JobMessage> jobMessageEventHandler, ExceptionCodec exceptionCodec) {
        this.jobId = driverIdentifier;
        this.runningJobEventHandler = runningJobEventHandler;
        this.completedJobEventHandler = completedJobEventHandler;
        this.failedJobEventHandler = failedJobEventHandler;
        this.jobMessageEventHandler = jobMessageEventHandler;
        this.exceptionCodec = exceptionCodec;
        this.jobControlHandler = remoteManager.getHandler(driverRID, ClientRuntimeProtocol.JobControlProto.class);
        this.runningJobEventHandler.onNext((Object)this);
        LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
    }

    @Override
    public synchronized void close() {
        this.jobControlHandler.onNext((Object)ClientRuntimeProtocol.JobControlProto.newBuilder().setIdentifier(this.jobId).setSignal(ClientRuntimeProtocol.Signal.SIG_TERMINATE).build());
    }

    @Override
    public synchronized void close(byte[] message) {
        this.jobControlHandler.onNext((Object)ClientRuntimeProtocol.JobControlProto.newBuilder().setIdentifier(this.jobId).setSignal(ClientRuntimeProtocol.Signal.SIG_TERMINATE).setMessage(ByteString.copyFrom((byte[])message)).build());
    }

    @Override
    public String getId() {
        return this.jobId;
    }

    @Override
    public synchronized void send(byte[] message) {
        this.jobControlHandler.onNext((Object)ClientRuntimeProtocol.JobControlProto.newBuilder().setIdentifier(this.jobId).setMessage(ByteString.copyFrom((byte[])message)).build());
    }

    public synchronized void onNext(ReefServiceProtos.JobStatusProto value) {
        ReefServiceProtos.State state = value.getState();
        LOG.log(Level.FINEST, "Received job status: {0} from {1}", new Object[]{state, value.getIdentifier()});
        if (value.hasMessage()) {
            this.jobMessageEventHandler.onNext((Object)new JobMessage(this.getId(), value.getMessage().toByteArray()));
        }
        if (state == ReefServiceProtos.State.DONE) {
            this.completedJobEventHandler.onNext((Object)new CompletedJobImpl(this.getId()));
        } else if (state == ReefServiceProtos.State.FAILED) {
            this.onJobFailure(value);
        }
    }

    private synchronized void onJobFailure(ReefServiceProtos.JobStatusProto jobStatusProto) {
        assert (jobStatusProto.getState() == ReefServiceProtos.State.FAILED);
        String id = this.jobId;
        Optional data = jobStatusProto.hasException() ? Optional.of((Object)jobStatusProto.getException().toByteArray()) : Optional.empty();
        Optional<Throwable> cause = this.exceptionCodec.fromBytes((Optional<byte[]>)data);
        String message = cause.isPresent() ? ((Throwable)cause.get()).getMessage() : "No Message sent by the Job";
        Optional description = Optional.of((Object)message);
        FailedJob failedJob = new FailedJob(id, message, (Optional<String>)description, cause, (Optional<byte[]>)data);
        this.failedJobEventHandler.onNext((Object)failedJob);
    }

    public String toString() {
        return "RunningJob{'" + this.jobId + "'}";
    }
}

