/*
 * Decompiled with CFR 0.152.
 */
package org.apache.reef.runtime.common.client;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.FailedRuntime;
import org.apache.reef.client.parameters.ResourceManagerErrorHandler;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.client.REEFImplementation;
import org.apache.reef.runtime.common.client.RunningJobImpl;
import org.apache.reef.runtime.common.client.RunningJobs;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.RemoteMessage;

@ClientSide
@Private
final class RunningJobsImpl
implements RunningJobs {
    private static final Logger LOG = Logger.getLogger(RunningJobsImpl.class.getName());
    private final Map<String, RunningJobImpl> jobs = new HashMap<String, RunningJobImpl>();
    private final Injector injector;
    private final InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler;

    @Inject
    RunningJobsImpl(Injector injector, @Parameter(value=ResourceManagerErrorHandler.class) InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler) {
        this.injector = injector;
        this.failedRuntimeEventHandler = failedRuntimeEventHandler;
        LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
    }

    @Override
    public synchronized void closeAllJobs() {
        for (RunningJobImpl runningJob : this.jobs.values()) {
            LOG.log(Level.WARNING, "Force close job {0}", runningJob.getId());
            runningJob.close();
        }
    }

    @Override
    public synchronized void onJobStatusMessage(RemoteMessage<ReefServiceProtos.JobStatusProto> message) {
        ReefServiceProtos.JobStatusProto status = (ReefServiceProtos.JobStatusProto)message.getMessage();
        String jobIdentifier = status.getIdentifier();
        LOG.log(Level.FINE, "Processing message from Job: " + jobIdentifier);
        if (status.getState() == ReefServiceProtos.State.INIT) {
            try {
                RunningJobImpl runningJob = this.newRunningJob(status.getIdentifier(), message.getIdentifier().toString());
                this.put(runningJob);
            }
            catch (BindException | InjectionException configError) {
                throw new RuntimeException("Configuration error for: " + status, configError);
            }
        }
        this.get(jobIdentifier).onNext(status);
        if (status.getState() != ReefServiceProtos.State.RUNNING && status.getState() != ReefServiceProtos.State.INIT) {
            this.remove(status.getIdentifier());
        }
        LOG.log(Level.FINE, "Done processing message from Job " + jobIdentifier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void onRuntimeErrorMessage(RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeFailure) {
        try {
            this.remove(((ReefServiceProtos.RuntimeErrorProto)runtimeFailure.getMessage()).getIdentifier());
        }
        finally {
            ((EventHandler)this.failedRuntimeEventHandler.get()).onNext((Object)new FailedRuntime((ReefServiceProtos.RuntimeErrorProto)runtimeFailure.getMessage()));
        }
    }

    private synchronized RunningJobImpl get(String jobIdentifier) {
        RunningJobImpl result = this.jobs.get(jobIdentifier);
        if (null == result) {
            throw new RuntimeException("Trying to get a RunningJob that is unknown: " + jobIdentifier);
        }
        return result;
    }

    private synchronized void remove(String jobIdentifier) {
        RunningJobImpl result = this.jobs.remove(jobIdentifier);
        if (null == result) {
            throw new RuntimeException("Trying to remove a RunningJob that is unknown: " + jobIdentifier);
        }
    }

    private synchronized void put(RunningJobImpl runningJob) {
        String jobIdentifier = runningJob.getId();
        if (this.jobs.containsKey(jobIdentifier)) {
            throw new IllegalStateException("Trying to re-add a job that is already known: " + jobIdentifier);
        }
        LOG.log(Level.FINE, "Adding Job with ID: " + jobIdentifier);
        this.jobs.put(jobIdentifier, runningJob);
    }

    private synchronized RunningJobImpl newRunningJob(String jobIdentifier, String remoteIdentifier) throws BindException, InjectionException {
        Injector child = this.injector.forkInjector();
        child.bindVolatileParameter(REEFImplementation.DriverRemoteIdentifier.class, (Object)remoteIdentifier);
        child.bindVolatileParameter(DriverIdentifier.class, (Object)jobIdentifier);
        return (RunningJobImpl)child.getInstance(RunningJobImpl.class);
    }
}

