package org.apache.reef.io.network.group.impl.driver;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.context.ServiceConfiguration;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.io.network.Message;
import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec;
import org.apache.reef.io.network.group.impl.config.parameters.GroupCommFailedEvalHandler;
import org.apache.reef.io.network.group.impl.config.parameters.GroupCommFailedTaskHandler;
import org.apache.reef.io.network.group.impl.config.parameters.GroupCommRunningTaskHandler;
import org.apache.reef.io.network.group.impl.config.parameters.GroupCommSenderStage;
import org.apache.reef.io.network.group.impl.config.parameters.SerializedGroupConfigs;
import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOut;
import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl;
import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.impl.BindNSToTask;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.impl.NetworkServiceClosingHandler;
import org.apache.reef.io.network.impl.NetworkServiceParameters;
import org.apache.reef.io.network.impl.UnbindNSFromTask;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.io.network.naming.NameResolverConfiguration;
import org.apache.reef.io.network.naming.NameServer;
import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.SingletonAsserter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.impl.LoggingEventHandler;
import org.apache.reef.wake.impl.SyncStage;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.transport.TransportFactory;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.class */
public class GroupCommDriverImpl implements GroupCommServiceDriver {
    private static final Logger LOG;
    private static final Tang TANG;
    private final CommunicationGroupDriverFactory commGroupDriverFactory;
    private final NameServer nameService;
    private final String nameServiceAddr;
    private final int nameServicePort;
    private final ConfigurationSerializer confSerializer;
    private final NetworkService<GroupCommunicationMessage> netService;
    private final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler;
    private final EStage<RunningTask> groupCommRunningTaskStage;
    private final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler;
    private final EStage<FailedTask> groupCommFailedTaskStage;
    private final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler;
    private final EStage<FailedEvaluator> groupCommFailedEvaluatorStage;
    private final GroupCommMessageHandler groupCommMessageHandler;
    private final EStage<GroupCommunicationMessage> groupCommMessageStage;
    private final int fanOut;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicInteger contextIds = new AtomicInteger(0);
    private final IdentifierFactory idFac = new StringIdentifierFactory();
    private final Map<Class<? extends Name<String>>, CommunicationGroupDriver> commGroupDrivers = new HashMap();

    @Inject
    @Deprecated
    public GroupCommDriverImpl(ConfigurationSerializer configurationSerializer, @Parameter(DriverIdentifier.class) String str, @Parameter(TreeTopologyFanOut.class) int i, LocalAddressProvider localAddressProvider, TransportFactory transportFactory, NameServer nameServer) {
        if (!$assertionsDisabled && !SingletonAsserter.assertSingleton(getClass())) {
            throw new AssertionError();
        }
        this.fanOut = i;
        this.nameService = nameServer;
        this.nameServiceAddr = localAddressProvider.getLocalAddress();
        this.nameServicePort = nameServer.getPort();
        this.confSerializer = configurationSerializer;
        this.groupCommRunningTaskHandler = new BroadcastingEventHandler<>();
        this.groupCommRunningTaskStage = new SyncStage("GroupCommRunningTaskStage", this.groupCommRunningTaskHandler);
        this.groupCommFailedTaskHandler = new BroadcastingEventHandler<>();
        this.groupCommFailedTaskStage = new SyncStage("GroupCommFailedTaskStage", this.groupCommFailedTaskHandler);
        this.groupCommFailedEvaluatorHandler = new BroadcastingEventHandler<>();
        this.groupCommFailedEvaluatorStage = new SyncStage("GroupCommFailedEvaluatorStage", this.groupCommFailedEvaluatorHandler);
        this.groupCommMessageHandler = new GroupCommMessageHandler();
        this.groupCommMessageStage = new SyncStage("GroupCommMessageStage", this.groupCommMessageHandler);
        try {
            this.netService = new NetworkService<>(this.idFac, 0, (NameResolver) Tang.Factory.getTang().newInjector(Tang.Factory.getTang().newConfigurationBuilder(new Configuration[]{NameResolverConfiguration.CONF.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.nameServiceAddr).set(NameResolverConfiguration.NAME_SERVICE_PORT, Integer.valueOf(this.nameServicePort)).build()}).build()).getInstance(NameResolver.class), new GroupCommunicationMessageCodec(), transportFactory, new EventHandler<Message<GroupCommunicationMessage>>() { // from class: org.apache.reef.io.network.group.impl.driver.GroupCommDriverImpl.1
                public void onNext(Message<GroupCommunicationMessage> message) {
                    GroupCommDriverImpl.this.groupCommMessageStage.onNext(Utils.getGCM(message));
                }
            }, new LoggingEventHandler(), localAddressProvider);
            this.netService.registerId(this.idFac.getNewInstance(str));
            ThreadPoolStage threadPoolStage = new ThreadPoolStage("SrcCtrlMsgSender", new CtrlMsgSender(this.idFac, this.netService), 5);
            Injector newInjector = TANG.newInjector();
            newInjector.bindVolatileParameter(GroupCommSenderStage.class, threadPoolStage);
            newInjector.bindVolatileParameter(DriverIdentifier.class, str);
            newInjector.bindVolatileParameter(GroupCommRunningTaskHandler.class, this.groupCommRunningTaskHandler);
            newInjector.bindVolatileParameter(GroupCommFailedTaskHandler.class, this.groupCommFailedTaskHandler);
            newInjector.bindVolatileParameter(GroupCommFailedEvalHandler.class, this.groupCommFailedEvaluatorHandler);
            newInjector.bindVolatileInstance(GroupCommMessageHandler.class, this.groupCommMessageHandler);
            try {
                this.commGroupDriverFactory = (CommunicationGroupDriverFactory) newInjector.getInstance(CommunicationGroupDriverFactory.class);
            } catch (InjectionException e) {
                throw new RuntimeException((Throwable) e);
            }
        } catch (InjectionException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> cls, int i) {
        return newCommunicationGroup(cls, TreeTopology.class, i, this.fanOut);
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> cls, int i, int i2) {
        return newCommunicationGroup(cls, TreeTopology.class, i, i2);
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> cls, Class<? extends Topology> cls2, int i, int i2) {
        LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", new Object[]{Utils.simpleName(cls), Integer.valueOf(i)});
        try {
            CommunicationGroupDriver newInstance = this.commGroupDriverFactory.getNewInstance(cls, cls2, i, i2);
            this.commGroupDrivers.put(cls, newInstance);
            LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup", "Created communication group: " + Utils.simpleName(cls));
            return newInstance;
        } catch (InjectionException e) {
            LOG.log(Level.WARNING, "Cannot inject new CommunicationGroupDriver");
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public boolean isConfigured(ActiveContext activeContext) {
        LOG.entering("GroupCommDriverImpl", "isConfigured", activeContext.getId());
        boolean startsWith = activeContext.getId().startsWith("GroupCommunicationContext-");
        LOG.exiting("GroupCommDriverImpl", "isConfigured", Boolean.valueOf(startsWith));
        return startsWith;
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public Configuration getContextConfiguration() {
        LOG.entering("GroupCommDriverImpl", "getContextConf");
        Configuration build = ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, "GroupCommunicationContext-" + this.contextIds.getAndIncrement()).build();
        LOG.exiting("GroupCommDriverImpl", "getContextConf", this.confSerializer.toString(build));
        return build;
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public Configuration getServiceConfiguration() {
        LOG.entering("GroupCommDriverImpl", "getServiceConf");
        Configuration build = TANG.newConfigurationBuilder(new Configuration[]{ServiceConfiguration.CONF.set(ServiceConfiguration.SERVICES, NetworkService.class).set(ServiceConfiguration.SERVICES, GroupCommNetworkHandlerImpl.class).set(ServiceConfiguration.ON_CONTEXT_STOP, NetworkServiceClosingHandler.class).set(ServiceConfiguration.ON_TASK_STARTED, BindNSToTask.class).set(ServiceConfiguration.ON_TASK_STOP, UnbindNSFromTask.class).build()}).bindNamedParameter(NetworkServiceParameters.NetworkServiceCodec.class, GroupCommunicationMessageCodec.class).bindNamedParameter(NetworkServiceParameters.NetworkServiceHandler.class, GroupCommNetworkHandlerImpl.class).bindNamedParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, ExceptionHandler.class).bindNamedParameter(NameResolverNameServerAddr.class, this.nameServiceAddr).bindNamedParameter(NameResolverNameServerPort.class, Integer.toString(this.nameServicePort)).bindNamedParameter(NetworkServiceParameters.NetworkServicePort.class, "0").build();
        LOG.exiting("GroupCommDriverImpl", "getServiceConf", this.confSerializer.toString(build));
        return build;
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommDriver
    public Configuration getTaskConfiguration(Configuration configuration) {
        LOG.entering("GroupCommDriverImpl", "getTaskConfiguration", new Object[]{this.confSerializer.toString(configuration)});
        JavaConfigurationBuilder newConfigurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(new Configuration[]{configuration});
        Iterator<CommunicationGroupDriver> it = this.commGroupDrivers.values().iterator();
        while (it.hasNext()) {
            Configuration taskConfiguration = it.next().getTaskConfiguration(configuration);
            if (taskConfiguration != null) {
                newConfigurationBuilder.bindSetEntry(SerializedGroupConfigs.class, this.confSerializer.toString(taskConfiguration));
            }
        }
        Configuration build = newConfigurationBuilder.build();
        LOG.exiting("GroupCommDriverImpl", "getTaskConfiguration", this.confSerializer.toString(build));
        return build;
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver
    public EStage<RunningTask> getGroupCommRunningTaskStage() {
        LOG.entering("GroupCommDriverImpl", "getGroupCommRunningTaskStage");
        LOG.exiting("GroupCommDriverImpl", "getGroupCommRunningTaskStage", "Returning GroupCommRunningTaskStage");
        return this.groupCommRunningTaskStage;
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver
    public EStage<FailedTask> getGroupCommFailedTaskStage() {
        LOG.entering("GroupCommDriverImpl", "getGroupCommFailedTaskStage");
        LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedTaskStage", "Returning GroupCommFailedTaskStage");
        return this.groupCommFailedTaskStage;
    }

    @Override // org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver
    public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() {
        LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage");
        LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFailedEvaluatorStage");
        return this.groupCommFailedEvaluatorStage;
    }

    static {
        $assertionsDisabled = !GroupCommDriverImpl.class.desiredAssertionStatus();
        LOG = Logger.getLogger(GroupCommDriverImpl.class.getName());
        TANG = Tang.Factory.getTang();
    }
}
