package org.apache.reef.runtime.common.client;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.FailedRuntime;
import org.apache.reef.client.parameters.ResourceManagerErrorHandler;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.client.REEFImplementation;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.RemoteMessage;

@ClientSide
@Private
/* loaded from: input_file:org/apache/reef/runtime/common/client/RunningJobsImpl.class */
final class RunningJobsImpl implements RunningJobs {
    private static final Logger LOG = Logger.getLogger(RunningJobsImpl.class.getName());
    private final Map<String, RunningJobImpl> jobs = new HashMap();
    private final Injector injector;
    private final InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler;

    @Inject
    RunningJobsImpl(Injector injector, @Parameter(ResourceManagerErrorHandler.class) InjectionFuture<EventHandler<FailedRuntime>> injectionFuture) {
        this.injector = injector;
        this.failedRuntimeEventHandler = injectionFuture;
        LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
    }

    @Override // org.apache.reef.runtime.common.client.RunningJobs
    public synchronized void closeAllJobs() {
        for (RunningJobImpl runningJobImpl : this.jobs.values()) {
            LOG.log(Level.WARNING, "Force close job {0}", runningJobImpl.getId());
            runningJobImpl.close();
        }
    }

    @Override // org.apache.reef.runtime.common.client.RunningJobs
    public synchronized void onJobStatusMessage(RemoteMessage<ReefServiceProtos.JobStatusProto> remoteMessage) {
        ReefServiceProtos.JobStatusProto message = remoteMessage.getMessage();
        String identifier = message.getIdentifier();
        LOG.log(Level.FINE, "Processing message from Job: " + identifier);
        if (message.getState() == ReefServiceProtos.State.INIT) {
            try {
                put(newRunningJob(message.getIdentifier(), remoteMessage.getIdentifier().toString()));
            } catch (BindException | InjectionException e) {
                throw new RuntimeException("Configuration error for: " + message, e);
            }
        }
        get(identifier).onNext(message);
        if (message.getState() != ReefServiceProtos.State.RUNNING && message.getState() != ReefServiceProtos.State.INIT) {
            remove(message.getIdentifier());
        }
        LOG.log(Level.FINE, "Done processing message from Job " + identifier);
    }

    @Override // org.apache.reef.runtime.common.client.RunningJobs
    public synchronized void onRuntimeErrorMessage(RemoteMessage<ReefServiceProtos.RuntimeErrorProto> remoteMessage) {
        try {
            remove(remoteMessage.getMessage().getIdentifier());
            this.failedRuntimeEventHandler.get().onNext(new FailedRuntime(remoteMessage.getMessage()));
        } catch (Throwable th) {
            this.failedRuntimeEventHandler.get().onNext(new FailedRuntime(remoteMessage.getMessage()));
            throw th;
        }
    }

    private synchronized RunningJobImpl get(String str) {
        RunningJobImpl runningJobImpl = this.jobs.get(str);
        if (null == runningJobImpl) {
            throw new RuntimeException("Trying to get a RunningJob that is unknown: " + str);
        }
        return runningJobImpl;
    }

    private synchronized void remove(String str) {
        if (null == this.jobs.remove(str)) {
            throw new RuntimeException("Trying to remove a RunningJob that is unknown: " + str);
        }
    }

    private synchronized void put(RunningJobImpl runningJobImpl) {
        String id = runningJobImpl.getId();
        if (this.jobs.containsKey(id)) {
            throw new IllegalStateException("Trying to re-add a job that is already known: " + id);
        }
        LOG.log(Level.FINE, "Adding Job with ID: " + id);
        this.jobs.put(id, runningJobImpl);
    }

    private synchronized RunningJobImpl newRunningJob(String str, String str2) throws BindException, InjectionException {
        Injector forkInjector = this.injector.forkInjector();
        forkInjector.bindVolatileParameter(REEFImplementation.DriverRemoteIdentifier.class, str2);
        forkInjector.bindVolatileParameter(DriverIdentifier.class, str);
        return (RunningJobImpl) forkInjector.getInstance(RunningJobImpl.class);
    }
}
