package org.apache.reef.examples.group.broadcast;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
import org.apache.reef.examples.group.bgd.operatornames.ControlMessageBroadcaster;
import org.apache.reef.examples.group.bgd.parameters.AllCommunicationGroup;
import org.apache.reef.examples.group.bgd.parameters.ModelDimensions;
import org.apache.reef.examples.group.broadcast.parameters.ModelBroadcaster;
import org.apache.reef.examples.group.broadcast.parameters.ModelReceiveAckReducer;
import org.apache.reef.examples.group.broadcast.parameters.NumberOfReceivers;
import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
import org.apache.reef.io.network.group.api.driver.GroupCommDriver;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
import org.apache.reef.io.serialization.SerializableCodec;
import org.apache.reef.poison.PoisonedConfiguration;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;

@DriverSide
@Unit
/* loaded from: input_file:org/apache/reef/examples/group/broadcast/BroadcastDriver.class */
public class BroadcastDriver {
    private static final Logger LOG = Logger.getLogger(BroadcastDriver.class.getName());
    private final AtomicBoolean masterSubmitted = new AtomicBoolean(false);
    private final AtomicInteger slaveIds = new AtomicInteger(0);
    private final AtomicInteger failureSet = new AtomicInteger(0);
    private final GroupCommDriver groupCommDriver;
    private final CommunicationGroupDriver allCommGroup;
    private final ConfigurationSerializer confSerializer;
    private final int dimensions;
    private final EvaluatorRequestor requestor;
    private final int numberOfReceivers;
    private final AtomicInteger numberOfAllocatedEvaluators;
    private String groupCommConfiguredMasterId;

    /* loaded from: input_file:org/apache/reef/examples/group/broadcast/BroadcastDriver$ContextActiveHandler.class */
    public class ContextActiveHandler implements EventHandler<ActiveContext> {
        private final AtomicBoolean storeMasterId = new AtomicBoolean(false);

        public ContextActiveHandler() {
        }

        public void onNext(ActiveContext activeContext) {
            BroadcastDriver.LOG.log(Level.FINE, "Got active context: {0}", activeContext.getId());
            if (!BroadcastDriver.this.groupCommDriver.isConfigured(activeContext)) {
                Configuration contextConfiguration = BroadcastDriver.this.groupCommDriver.getContextConfiguration();
                String contextId = contextId(contextConfiguration);
                if (this.storeMasterId.compareAndSet(false, true)) {
                    BroadcastDriver.this.groupCommConfiguredMasterId = contextId;
                }
                Configuration serviceConfiguration = BroadcastDriver.this.groupCommDriver.getServiceConfiguration();
                BroadcastDriver.LOG.log(Level.FINER, "Submit GCContext conf: {0}", BroadcastDriver.this.confSerializer.toString(contextConfiguration));
                BroadcastDriver.LOG.log(Level.FINER, "Submit Service conf: {0}", BroadcastDriver.this.confSerializer.toString(serviceConfiguration));
                activeContext.submitContextAndService(contextConfiguration, serviceConfiguration);
                return;
            }
            if (!activeContext.getId().equals(BroadcastDriver.this.groupCommConfiguredMasterId) || masterTaskSubmitted()) {
                Configuration build = Tang.Factory.getTang().newConfigurationBuilder(new Configuration[]{TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, getSlaveId(activeContext)).set(TaskConfiguration.TASK, SlaveTask.class).build(), PoisonedConfiguration.TASK_CONF.set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4").set(PoisonedConfiguration.CRASH_TIMEOUT, "1").build()}).bindNamedParameter(ModelDimensions.class, Integer.toString(BroadcastDriver.this.dimensions)).build();
                BroadcastDriver.this.allCommGroup.addTask(build);
                Configuration taskConfiguration = BroadcastDriver.this.groupCommDriver.getTaskConfiguration(build);
                BroadcastDriver.LOG.log(Level.FINER, "Submit SlaveTask conf: {0}", BroadcastDriver.this.confSerializer.toString(taskConfiguration));
                activeContext.submitTask(taskConfiguration);
                return;
            }
            Configuration build2 = Tang.Factory.getTang().newConfigurationBuilder(new Configuration[]{TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, "MasterTask").set(TaskConfiguration.TASK, MasterTask.class).build()}).bindNamedParameter(ModelDimensions.class, Integer.toString(BroadcastDriver.this.dimensions)).build();
            BroadcastDriver.this.allCommGroup.addTask(build2);
            Configuration taskConfiguration2 = BroadcastDriver.this.groupCommDriver.getTaskConfiguration(build2);
            BroadcastDriver.LOG.log(Level.FINER, "Submit MasterTask conf: {0}", BroadcastDriver.this.confSerializer.toString(taskConfiguration2));
            activeContext.submitTask(taskConfiguration2);
        }

        private String contextId(Configuration configuration) {
            try {
                return (String) Tang.Factory.getTang().newInjector(configuration).getNamedInstance(ContextIdentifier.class);
            } catch (InjectionException e) {
                throw new RuntimeException("Unable to inject context identifier from context conf", e);
            }
        }

        private String getSlaveId(ActiveContext activeContext) {
            return "SlaveTask-" + BroadcastDriver.this.slaveIds.getAndIncrement();
        }

        private boolean masterTaskSubmitted() {
            return !BroadcastDriver.this.masterSubmitted.compareAndSet(false, true);
        }
    }

    /* loaded from: input_file:org/apache/reef/examples/group/broadcast/BroadcastDriver$ContextCloseHandler.class */
    public class ContextCloseHandler implements EventHandler<ClosedContext> {
        public ContextCloseHandler() {
        }

        public void onNext(ClosedContext closedContext) {
            BroadcastDriver.LOG.log(Level.FINE, "Got closed context: {0}", closedContext.getId());
            ActiveContext parentContext = closedContext.getParentContext();
            if (parentContext != null) {
                BroadcastDriver.LOG.log(Level.FINE, "Closing parent context: {0}", parentContext.getId());
                parentContext.close();
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/examples/group/broadcast/BroadcastDriver$EvaluatorAllocatedHandler.class */
    final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
        EvaluatorAllocatedHandler() {
        }

        public void onNext(AllocatedEvaluator allocatedEvaluator) {
            BroadcastDriver.LOG.log(Level.INFO, "Submitting an id context to AllocatedEvaluator: {0}", allocatedEvaluator);
            allocatedEvaluator.submitContext(ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, "BroadcastContext-" + BroadcastDriver.this.numberOfAllocatedEvaluators.getAndDecrement()).build());
        }
    }

    /* loaded from: input_file:org/apache/reef/examples/group/broadcast/BroadcastDriver$FailedTaskHandler.class */
    public class FailedTaskHandler implements EventHandler<FailedTask> {
        public FailedTaskHandler() {
        }

        public void onNext(FailedTask failedTask) {
            BroadcastDriver.LOG.log(Level.FINE, "Got failed Task: {0}", failedTask.getId());
            ActiveContext activeContext = (ActiveContext) failedTask.getActiveContext().get();
            Configuration taskConfiguration = BroadcastDriver.this.groupCommDriver.getTaskConfiguration(Tang.Factory.getTang().newConfigurationBuilder(new Configuration[]{TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, failedTask.getId()).set(TaskConfiguration.TASK, SlaveTask.class).build(), PoisonedConfiguration.TASK_CONF.set(PoisonedConfiguration.CRASH_PROBABILITY, "0").set(PoisonedConfiguration.CRASH_TIMEOUT, "1").build()}).bindNamedParameter(ModelDimensions.class, "" + BroadcastDriver.this.dimensions).build());
            BroadcastDriver.LOG.log(Level.FINER, "Submit SlaveTask conf: {0}", BroadcastDriver.this.confSerializer.toString(taskConfiguration));
            activeContext.submitTask(taskConfiguration);
        }
    }

    /* loaded from: input_file:org/apache/reef/examples/group/broadcast/BroadcastDriver$StartHandler.class */
    final class StartHandler implements EventHandler<StartTime> {
        StartHandler() {
        }

        public void onNext(StartTime startTime) {
            int i = BroadcastDriver.this.numberOfReceivers + 1;
            BroadcastDriver.LOG.log(Level.FINE, "Requesting {0} evaluators", Integer.valueOf(i));
            BroadcastDriver.this.requestor.newRequest().setNumber(i).setMemory(2048).submit();
        }
    }

    @Inject
    public BroadcastDriver(EvaluatorRequestor evaluatorRequestor, GroupCommDriver groupCommDriver, ConfigurationSerializer configurationSerializer, @Parameter(ModelDimensions.class) int i, @Parameter(NumberOfReceivers.class) int i2) {
        this.requestor = evaluatorRequestor;
        this.groupCommDriver = groupCommDriver;
        this.confSerializer = configurationSerializer;
        this.dimensions = i;
        this.numberOfReceivers = i2;
        this.numberOfAllocatedEvaluators = new AtomicInteger(i2 + 1);
        this.allCommGroup = this.groupCommDriver.newCommunicationGroup(AllCommunicationGroup.class, i2 + 1);
        LOG.info("Obtained all communication group");
        this.allCommGroup.addBroadcast(ControlMessageBroadcaster.class, BroadcastOperatorSpec.newBuilder().setSenderId("MasterTask").setDataCodecClass(SerializableCodec.class).build()).addBroadcast(ModelBroadcaster.class, BroadcastOperatorSpec.newBuilder().setSenderId("MasterTask").setDataCodecClass(SerializableCodec.class).build()).addReduce(ModelReceiveAckReducer.class, ReduceOperatorSpec.newBuilder().setReceiverId("MasterTask").setDataCodecClass(SerializableCodec.class).setReduceFunctionClass(ModelReceiveAckReduceFunction.class).build()).finalise();
        LOG.info("Added operators to allCommGroup");
    }
}
