package org.apache.nemo.driver;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.LogManager;
import javax.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nemo.common.ir.IdManager;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.ResourceSitePass;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.driver.NemoContext;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageParameters;
import org.apache.nemo.runtime.master.BroadcastManagerMaster;
import org.apache.nemo.runtime.master.ClientRPC;
import org.apache.nemo.runtime.master.RuntimeMaster;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.client.JobMessageObserver;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.io.network.naming.NameServer;
import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.time.event.StartTime;
import org.apache.reef.wake.time.event.StopTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DriverSide
@Unit
/* loaded from: input_file:org/apache/nemo/driver/NemoDriver.class */
public final class NemoDriver {
    private final NameServer nameServer;
    private final LocalAddressProvider localAddressProvider;
    private final String resourceSpecificationString;
    private final UserApplicationRunner userApplicationRunner;
    private final RuntimeMaster runtimeMaster;
    private final String jobId;
    private final String localDirectory;
    private final String glusterDirectory;
    private final ClientRPC clientRPC;
    private final RemoteClientMessageLoggingHandler handler;
    private static final Logger LOG = LoggerFactory.getLogger(NemoDriver.class.getName());
    private static ExecutorService runnerThread = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());

    /* loaded from: input_file:org/apache/nemo/driver/NemoDriver$ActiveContextHandler.class */
    public final class ActiveContextHandler implements EventHandler<ActiveContext> {
        public ActiveContextHandler() {
        }

        public void onNext(ActiveContext activeContext) {
            if (NemoDriver.this.runtimeMaster.onExecutorLaunched(activeContext)) {
                NemoDriver.this.clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder().setType(ControlMessage.DriverToClientMessageType.DriverReady).build());
            }
        }
    }

    /* loaded from: input_file:org/apache/nemo/driver/NemoDriver$AllocatedEvaluatorHandler.class */
    public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
        public AllocatedEvaluatorHandler() {
        }

        public void onNext(AllocatedEvaluator allocatedEvaluator) {
            String generateExecutorId = RuntimeIdManager.generateExecutorId();
            NemoDriver.this.runtimeMaster.onContainerAllocated(generateExecutorId, allocatedEvaluator, NemoDriver.this.getExecutorConfiguration(generateExecutorId));
        }
    }

    /* loaded from: input_file:org/apache/nemo/driver/NemoDriver$DriverStopHandler.class */
    public final class DriverStopHandler implements EventHandler<StopTime> {
        public DriverStopHandler() {
        }

        public void onNext(StopTime stopTime) {
            NemoDriver.this.handler.close();
            NemoDriver.this.clientRPC.shutdown();
        }
    }

    /* loaded from: input_file:org/apache/nemo/driver/NemoDriver$FailedContextHandler.class */
    public final class FailedContextHandler implements EventHandler<FailedContext> {
        public FailedContextHandler() {
        }

        public void onNext(FailedContext failedContext) {
            throw new RuntimeException(failedContext.getId() + " failed. See driver's log for the stack trace in executor.", failedContext.asError());
        }
    }

    /* loaded from: input_file:org/apache/nemo/driver/NemoDriver$FailedEvaluatorHandler.class */
    public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
        public FailedEvaluatorHandler() {
        }

        public void onNext(FailedEvaluator failedEvaluator) {
            NemoDriver.this.runtimeMaster.onExecutorFailed(failedEvaluator);
        }
    }

    /* loaded from: input_file:org/apache/nemo/driver/NemoDriver$StartHandler.class */
    public final class StartHandler implements EventHandler<StartTime> {
        public StartHandler() {
        }

        public void onNext(StartTime startTime) {
            NemoDriver.this.setUpLogger();
            NemoDriver.this.runtimeMaster.requestContainer(NemoDriver.this.resourceSpecificationString);
        }
    }

    @Inject
    private NemoDriver(UserApplicationRunner userApplicationRunner, RuntimeMaster runtimeMaster, NameServer nameServer, LocalAddressProvider localAddressProvider, JobMessageObserver jobMessageObserver, ClientRPC clientRPC, @Parameter(JobConf.ExecutorJSONContents.class) String str, @Parameter(JobConf.BandwidthJSONContents.class) String str2, @Parameter(JobConf.JobId.class) String str3, @Parameter(JobConf.FileDirectory.class) String str4, @Parameter(JobConf.GlusterVolumeDirectory.class) String str5) {
        IdManager.setInDriver();
        this.userApplicationRunner = userApplicationRunner;
        this.runtimeMaster = runtimeMaster;
        this.nameServer = nameServer;
        this.localAddressProvider = localAddressProvider;
        this.resourceSpecificationString = str;
        this.jobId = str3;
        this.localDirectory = str4;
        this.glusterDirectory = str5;
        this.handler = new RemoteClientMessageLoggingHandler(jobMessageObserver);
        this.clientRPC = clientRPC;
        ResourceSitePass.setBandwidthSpecificationString(str2);
        clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG, clientToDriverMessage -> {
            startSchedulingUserDAG(clientToDriverMessage.getLaunchDAG().getDag());
            BroadcastManagerMaster.registerBroadcastVariablesFromClient((Map) SerializationUtils.deserialize(clientToDriverMessage.getLaunchDAG().getBroadcastVars().toByteArray()));
        });
        clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.DriverShutdown, clientToDriverMessage2 -> {
            shutdown();
        });
        clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder().setType(ControlMessage.DriverToClientMessageType.DriverStarted).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setUpLogger() {
        LogManager.getLogManager().getLogger("").addHandler(this.handler);
    }

    private void shutdown() {
        LOG.info("Driver shutdown initiated");
        ExecutorService executorService = runnerThread;
        RuntimeMaster runtimeMaster = this.runtimeMaster;
        runtimeMaster.getClass();
        executorService.execute(runtimeMaster::terminate);
        runnerThread.shutdown();
    }

    private void startSchedulingUserDAG(String str) {
        runnerThread.execute(() -> {
            this.userApplicationRunner.run(str);
            this.clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder().setType(ControlMessage.DriverToClientMessageType.ExecutionDone).build());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Configuration getExecutorConfiguration(String str) {
        return Configurations.merge(new Configuration[]{JobConf.EXECUTOR_CONF.set(JobConf.EXECUTOR_ID, str).set(JobConf.GLUSTER_DISK_DIRECTORY, this.glusterDirectory).set(JobConf.LOCAL_DISK_DIRECTORY, this.localDirectory).set(JobConf.JOB_ID, this.jobId).build(), ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, str).set(ContextConfiguration.ON_CONTEXT_STARTED, NemoContext.ContextStartHandler.class).set(ContextConfiguration.ON_CONTEXT_STOP, NemoContext.ContextStopHandler.class).build(), getExecutorNcsConfiguration(), getExecutorMessageConfiguration(str)});
    }

    private Configuration getExecutorNcsConfiguration() {
        return Tang.Factory.getTang().newConfigurationBuilder().bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(this.nameServer.getPort())).bindNamedParameter(NameResolverNameServerAddr.class, this.localAddressProvider.getLocalAddress()).bindImplementation(IdentifierFactory.class, StringIdentifierFactory.class).build();
    }

    private Configuration getExecutorMessageConfiguration(String str) {
        return Tang.Factory.getTang().newConfigurationBuilder().bindNamedParameter(MessageParameters.SenderId.class, str).build();
    }
}
