package org.apache.reef.io.network.group.impl.driver;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.io.network.group.api.config.OperatorSpec;
import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
import org.apache.reef.io.network.group.impl.config.parameters.SerializedOperConfigs;
import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
import org.apache.reef.io.network.group.impl.utils.CountingSemaphore;
import org.apache.reef.io.network.group.impl.utils.SetMap;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.EStage;

@DriverSide
@Private
/* loaded from: input_file:org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.class */
public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
    private static final Logger LOG = Logger.getLogger(CommunicationGroupDriverImpl.class.getName());
    private final Class<? extends Name<String>> groupName;
    private final ConfigurationSerializer confSerializer;
    private final EStage<GroupCommunicationMessage> senderStage;
    private final String driverId;
    private final int numberOfTasks;
    private final CountingSemaphore allTasksAdded;
    private final int fanOut;
    private final ConcurrentMap<Class<? extends Name<String>>, OperatorSpec> operatorSpecs = new ConcurrentHashMap();
    private final ConcurrentMap<Class<? extends Name<String>>, Topology> topologies = new ConcurrentHashMap();
    private final Map<String, TaskState> perTaskState = new HashMap();
    private boolean finalised = false;
    private final Object topologiesLock = new Object();
    private final Object configLock = new Object();
    private final AtomicBoolean initializing = new AtomicBoolean(true);
    private final Object yetToRunLock = new Object();
    private final Object toBeRemovedLock = new Object();
    private final SetMap<MsgKey, IndexedMsg> msgQue = new SetMap<>();

    public CommunicationGroupDriverImpl(Class<? extends Name<String>> cls, ConfigurationSerializer configurationSerializer, EStage<GroupCommunicationMessage> eStage, BroadcastingEventHandler<RunningTask> broadcastingEventHandler, BroadcastingEventHandler<FailedTask> broadcastingEventHandler2, BroadcastingEventHandler<FailedEvaluator> broadcastingEventHandler3, BroadcastingEventHandler<GroupCommunicationMessage> broadcastingEventHandler4, String str, int i, int i2) {
        this.groupName = cls;
        this.numberOfTasks = i;
        this.driverId = str;
        this.confSerializer = configurationSerializer;
        this.senderStage = eStage;
        this.fanOut = i2;
        this.allTasksAdded = new CountingSemaphore(i, getQualifiedName(), this.topologiesLock);
        broadcastingEventHandler.addHandler(new TopologyRunningTaskHandler(this));
        broadcastingEventHandler2.addHandler(new TopologyFailedTaskHandler(this));
        broadcastingEventHandler3.addHandler(new TopologyFailedEvaluatorHandler(this));
        broadcastingEventHandler4.addHandler(new TopologyMessageHandler(this));
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public CommunicationGroupDriver addBroadcast(Class<? extends Name<String>> cls, BroadcastOperatorSpec broadcastOperatorSpec) {
        LOG.entering("CommunicationGroupDriverImpl", "addBroadcast", new Object[]{getQualifiedName(), Utils.simpleName(cls), broadcastOperatorSpec});
        if (this.finalised) {
            throw new IllegalStateException("Can't add more operators to a finalised spec");
        }
        this.operatorSpecs.put(cls, broadcastOperatorSpec);
        TreeTopology treeTopology = new TreeTopology(this.senderStage, this.groupName, cls, this.driverId, this.numberOfTasks, this.fanOut);
        treeTopology.setRootTask(broadcastOperatorSpec.getSenderId());
        treeTopology.setOperatorSpecification(broadcastOperatorSpec);
        this.topologies.put(cls, treeTopology);
        LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast", Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(cls), " added"}));
        return this;
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public CommunicationGroupDriver addReduce(Class<? extends Name<String>> cls, ReduceOperatorSpec reduceOperatorSpec) {
        LOG.entering("CommunicationGroupDriverImpl", "addReduce", new Object[]{getQualifiedName(), Utils.simpleName(cls), reduceOperatorSpec});
        if (this.finalised) {
            throw new IllegalStateException("Can't add more operators to a finalised spec");
        }
        LOG.finer(getQualifiedName() + "Adding reduce operator to tree topology: " + reduceOperatorSpec);
        this.operatorSpecs.put(cls, reduceOperatorSpec);
        TreeTopology treeTopology = new TreeTopology(this.senderStage, this.groupName, cls, this.driverId, this.numberOfTasks, this.fanOut);
        treeTopology.setRootTask(reduceOperatorSpec.getReceiverId());
        treeTopology.setOperatorSpecification(reduceOperatorSpec);
        this.topologies.put(cls, treeTopology);
        LOG.exiting("CommunicationGroupDriverImpl", "addReduce", Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(cls), " added"}));
        return this;
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public CommunicationGroupDriver addScatter(Class<? extends Name<String>> cls, ScatterOperatorSpec scatterOperatorSpec) {
        LOG.entering("CommunicationGroupDriverImpl", "addScatter", new Object[]{getQualifiedName(), Utils.simpleName(cls), scatterOperatorSpec});
        if (this.finalised) {
            throw new IllegalStateException("Can't add more operators to a finalised spec");
        }
        this.operatorSpecs.put(cls, scatterOperatorSpec);
        TreeTopology treeTopology = new TreeTopology(this.senderStage, this.groupName, cls, this.driverId, this.numberOfTasks, this.fanOut);
        treeTopology.setRootTask(scatterOperatorSpec.getSenderId());
        treeTopology.setOperatorSpecification(scatterOperatorSpec);
        this.topologies.put(cls, treeTopology);
        LOG.exiting("CommunicationGroupDriverImpl", "addScatter", Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(cls), scatterOperatorSpec}));
        return this;
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public CommunicationGroupDriver addGather(Class<? extends Name<String>> cls, GatherOperatorSpec gatherOperatorSpec) {
        LOG.entering("CommunicationGroupDriverImpl", "addGather", new Object[]{getQualifiedName(), Utils.simpleName(cls), gatherOperatorSpec});
        if (this.finalised) {
            throw new IllegalStateException("Can't add more operators to a finalised spec");
        }
        this.operatorSpecs.put(cls, gatherOperatorSpec);
        TreeTopology treeTopology = new TreeTopology(this.senderStage, this.groupName, cls, this.driverId, this.numberOfTasks, this.fanOut);
        treeTopology.setRootTask(gatherOperatorSpec.getReceiverId());
        treeTopology.setOperatorSpecification(gatherOperatorSpec);
        this.topologies.put(cls, treeTopology);
        LOG.exiting("CommunicationGroupDriverImpl", "addGather", Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(cls), gatherOperatorSpec}));
        return this;
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public Configuration getTaskConfiguration(Configuration configuration) {
        LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration", new Object[]{getQualifiedName(), this.confSerializer.toString(configuration)});
        JavaConfigurationBuilder newConfigurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
        String taskId = taskId(configuration);
        if (!this.perTaskState.containsKey(taskId)) {
            return null;
        }
        newConfigurationBuilder.bindNamedParameter(DriverIdentifier.class, this.driverId);
        newConfigurationBuilder.bindNamedParameter(CommunicationGroupName.class, this.groupName.getName());
        LOG.finest(getQualifiedName() + "Task has been added. Waiting to acquire configLock");
        synchronized (this.configLock) {
            LOG.finest(getQualifiedName() + "Acquired configLock");
            while (cantGetConfig(taskId)) {
                LOG.finest(getQualifiedName() + "Need to wait for failure");
                try {
                    this.configLock.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on configLock", e);
                }
            }
            LOG.finest(getQualifiedName() + taskId + " - Will fetch configuration now.");
            LOG.finest(getQualifiedName() + "Released configLock. Waiting to acquire topologiesLock");
        }
        synchronized (this.topologiesLock) {
            LOG.finest(getQualifiedName() + "Acquired topologiesLock");
            Iterator<Map.Entry<Class<? extends Name<String>>, OperatorSpec>> it = this.operatorSpecs.entrySet().iterator();
            while (it.hasNext()) {
                Class<? extends Name<String>> key = it.next().getKey();
                JavaConfigurationBuilder newConfigurationBuilder2 = Tang.Factory.getTang().newConfigurationBuilder(new Configuration[]{this.topologies.get(key).getTaskConfiguration(taskId)});
                newConfigurationBuilder2.bindNamedParameter(DriverIdentifier.class, this.driverId);
                newConfigurationBuilder2.bindNamedParameter(OperatorName.class, key.getName());
                newConfigurationBuilder.bindSetEntry(SerializedOperConfigs.class, this.confSerializer.toString(newConfigurationBuilder2.build()));
            }
            LOG.finest(getQualifiedName() + "Released topologiesLock");
        }
        Configuration build = newConfigurationBuilder.build();
        LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration", Arrays.toString(new Object[]{getQualifiedName(), this.confSerializer.toString(build)}));
        return build;
    }

    private boolean cantGetConfig(String str) {
        LOG.entering("CommunicationGroupDriverImpl", "cantGetConfig", new Object[]{getQualifiedName(), str});
        TaskState taskState = this.perTaskState.get(str);
        if (taskState.equals(TaskState.NOT_STARTED)) {
            LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", Arrays.toString(new Object[]{false, getQualifiedName(), str, " has not started. We can get config"}));
            return false;
        }
        LOG.finest(getQualifiedName() + str + " has started.");
        if (taskState.equals(TaskState.RUNNING)) {
            LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", Arrays.toString(new Object[]{true, getQualifiedName(), str, " is running. We can't get config"}));
            return true;
        }
        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", Arrays.toString(new Object[]{false, getQualifiedName(), str, " has failed. We can get config"}));
        return false;
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public void finalise() {
        this.finalised = true;
    }

    @Override // org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver
    public void addTask(Configuration configuration) {
        LOG.entering("CommunicationGroupDriverImpl", "addTask", new Object[]{getQualifiedName(), this.confSerializer.toString(configuration)});
        String taskId = taskId(configuration);
        LOG.finest(getQualifiedName() + "AddTask(" + taskId + "). Waiting to acquire toBeRemovedLock");
        synchronized (this.toBeRemovedLock) {
            LOG.finest(getQualifiedName() + "Acquired toBeRemovedLock");
            while (this.perTaskState.containsKey(taskId)) {
                LOG.finest(getQualifiedName() + "Trying to add an existing task. Will wait for removeTask");
                try {
                    this.toBeRemovedLock.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on toBeRemovedLock", e);
                }
            }
            LOG.finest(getQualifiedName() + "Released toBeRemovedLock. Waiting to acquire topologiesLock");
        }
        synchronized (this.topologiesLock) {
            LOG.finest(getQualifiedName() + "Acquired topologiesLock");
            Iterator<Class<? extends Name<String>>> it = this.operatorSpecs.keySet().iterator();
            while (it.hasNext()) {
                this.topologies.get(it.next()).addTask(taskId);
            }
            this.perTaskState.put(taskId, TaskState.NOT_STARTED);
            LOG.finest(getQualifiedName() + "Released topologiesLock");
        }
        LOG.fine(getQualifiedName() + "Added " + taskId + " to topology");
        LOG.exiting("CommunicationGroupDriverImpl", "addTask", Arrays.toString(new Object[]{getQualifiedName(), "Added task: ", taskId}));
    }

    public void removeTask(String str) {
        LOG.entering("CommunicationGroupDriverImpl", "removeTask", new Object[]{getQualifiedName(), str});
        LOG.info(getQualifiedName() + "Removing Task " + str + " as the evaluator has failed.");
        LOG.finest(getQualifiedName() + "Remove Task(" + str + "): Waiting to acquire topologiesLock");
        synchronized (this.topologiesLock) {
            LOG.finest(getQualifiedName() + "Acquired topologiesLock");
            Iterator<Class<? extends Name<String>>> it = this.operatorSpecs.keySet().iterator();
            while (it.hasNext()) {
                this.topologies.get(it.next()).removeTask(str);
            }
            this.perTaskState.remove(str);
            LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire toBeRemovedLock");
        }
        synchronized (this.toBeRemovedLock) {
            LOG.finest(getQualifiedName() + "Acquired toBeRemovedLock");
            LOG.finest(getQualifiedName() + "Removed Task " + str + " Notifying waiting threads");
            this.toBeRemovedLock.notifyAll();
            LOG.finest(getQualifiedName() + "Released toBeRemovedLock");
        }
        LOG.fine(getQualifiedName() + "Removed " + str + " to topology");
        LOG.exiting("CommunicationGroupDriverImpl", "removeTask", Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", str}));
    }

    public void runTask(String str) {
        LOG.entering("CommunicationGroupDriverImpl", "runTask", new Object[]{getQualifiedName(), str});
        LOG.finest(getQualifiedName() + "Task-" + str + " running. Waiting to acquire topologiesLock");
        LOG.fine(getQualifiedName() + "Got running Task: " + str);
        boolean z = false;
        synchronized (this.topologiesLock) {
            if (this.perTaskState.containsKey(str)) {
                LOG.finest(getQualifiedName() + "Acquired topologiesLock");
                Iterator<Class<? extends Name<String>>> it = this.operatorSpecs.keySet().iterator();
                while (it.hasNext()) {
                    this.topologies.get(it.next()).onRunningTask(str);
                }
                this.allTasksAdded.decrement();
                this.perTaskState.put(str, TaskState.RUNNING);
                LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire yetToRunLock");
            } else {
                z = true;
            }
        }
        synchronized (this.yetToRunLock) {
            LOG.finest(getQualifiedName() + "Acquired yetToRunLock");
            this.yetToRunLock.notifyAll();
            LOG.finest(getQualifiedName() + "Released yetToRunLock");
        }
        if (z) {
            LOG.exiting("CommunicationGroupDriverImpl", "runTask", getQualifiedName() + str + " does not belong to this communication group. Ignoring");
        } else {
            LOG.fine(getQualifiedName() + "Status of task " + str + " changed to RUNNING");
            LOG.exiting("CommunicationGroupDriverImpl", "runTask", Arrays.toString(new Object[]{getQualifiedName(), "Set running complete on task ", str}));
        }
    }

    public void failTask(String str) {
        LOG.entering("CommunicationGroupDriverImpl", "failTask", new Object[]{getQualifiedName(), str});
        LOG.finest(getQualifiedName() + "Task-" + str + " failed. Waiting to acquire yetToRunLock");
        LOG.fine(getQualifiedName() + "Got failed Task: " + str);
        synchronized (this.yetToRunLock) {
            LOG.finest(getQualifiedName() + "Acquired yetToRunLock");
            if (!this.perTaskState.containsKey(str)) {
                LOG.fine(getQualifiedName() + " does not have this task, another communicationGroup must have it");
                return;
            }
            while (cantFailTask(str)) {
                LOG.finest(getQualifiedName() + "Need to wait for it run");
                try {
                    this.yetToRunLock.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(getQualifiedName() + "InterruptedException while waiting on yetToRunLock", e);
                }
            }
            LOG.finest(getQualifiedName() + str + " - Can safely set failure.");
            LOG.finest(getQualifiedName() + "Released yetToRunLock. Waiting to acquire topologiesLock");
            synchronized (this.topologiesLock) {
                LOG.finest(getQualifiedName() + "Acquired topologiesLock");
                Iterator<Class<? extends Name<String>>> it = this.operatorSpecs.keySet().iterator();
                while (it.hasNext()) {
                    this.topologies.get(it.next()).onFailedTask(str);
                }
                this.allTasksAdded.increment();
                this.perTaskState.put(str, TaskState.FAILED);
                LOG.finest(getQualifiedName() + "Removing msgs associated with dead task " + str + " from msgQue.");
                Set<MsgKey> keySet = this.msgQue.keySet();
                ArrayList arrayList = new ArrayList();
                for (MsgKey msgKey : keySet) {
                    if (msgKey.getSrc().equals(str)) {
                        arrayList.add(msgKey);
                    }
                }
                LOG.finest(getQualifiedName() + arrayList + " keys that will be removed");
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    this.msgQue.remove((MsgKey) it2.next());
                }
                LOG.finest(getQualifiedName() + "Released topologiesLock. Waiting to acquire configLock");
            }
            synchronized (this.configLock) {
                LOG.finest(getQualifiedName() + "Acquired configLock");
                this.configLock.notifyAll();
                LOG.finest(getQualifiedName() + "Released configLock");
            }
            LOG.fine(getQualifiedName() + "Status of task " + str + " changed to FAILED");
            LOG.exiting("CommunicationGroupDriverImpl", "failTask", Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete on task ", str}));
        }
    }

    private boolean cantFailTask(String str) {
        LOG.entering("CommunicationGroupDriverImpl", "cantFailTask", new Object[]{getQualifiedName(), str});
        TaskState taskState = this.perTaskState.get(str);
        if (taskState.equals(TaskState.NOT_STARTED)) {
            LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", Arrays.toString(new Object[]{true, getQualifiedName(), str, " has not started. We can't fail a task that hasn't started"}));
            return true;
        }
        LOG.finest(getQualifiedName() + str + " has started.");
        if (taskState.equals(TaskState.RUNNING)) {
            LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", Arrays.toString(new Object[]{false, getQualifiedName(), str, " is running. Can set failure"}));
            return false;
        }
        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", Arrays.toString(new Object[]{true, getQualifiedName(), str, " is not running yet. Can't set failure"}));
        return true;
    }

    public void queNProcessMsg(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("CommunicationGroupDriverImpl", "queNProcessMsg", new Object[]{getQualifiedName(), groupCommunicationMessage});
        IndexedMsg indexedMsg = new IndexedMsg(groupCommunicationMessage);
        Class<? extends Name<String>> operName = indexedMsg.getOperName();
        MsgKey msgKey = new MsgKey(groupCommunicationMessage);
        if (this.msgQue.contains(msgKey, indexedMsg)) {
            throw new RuntimeException(getQualifiedName() + "MsgQue already contains " + groupCommunicationMessage.getType() + " msg for " + msgKey + " in " + Utils.simpleName(operName));
        }
        LOG.finest(getQualifiedName() + "Adding msg to que");
        this.msgQue.add(msgKey, indexedMsg);
        if (this.msgQue.count(msgKey) == this.topologies.size()) {
            LOG.finest(getQualifiedName() + "MsgQue for " + msgKey + " contains " + groupCommunicationMessage.getType() + " msgs from: " + this.msgQue.get(msgKey));
            for (IndexedMsg indexedMsg2 : this.msgQue.remove(msgKey)) {
                this.topologies.get(indexedMsg2.getOperName()).onReceiptOfMessage(indexedMsg2.getMsg());
            }
            LOG.finest(getQualifiedName() + "All msgs processed and removed");
        }
        LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg", Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done for: ", groupCommunicationMessage}));
    }

    private boolean isMsgVersionOk(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("CommunicationGroupDriverImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), groupCommunicationMessage});
        if (!groupCommunicationMessage.hasVersion()) {
            throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs");
        }
        boolean chkVersion = chkVersion(groupCommunicationMessage.getSrcVersion(), this.topologies.get(Utils.getClass(groupCommunicationMessage.getOperatorname())).getNodeVersion(groupCommunicationMessage.getSrcid()), "Src Version Check: ");
        LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk", Arrays.toString(new Object[]{Boolean.valueOf(chkVersion), getQualifiedName(), groupCommunicationMessage}));
        return chkVersion;
    }

    private boolean chkVersion(int i, int i2, String str) {
        if (i < i2) {
            LOG.warning(getQualifiedName() + str + "received a ver-" + i + " msg while expecting ver-" + i2);
            return false;
        }
        if (i <= i2) {
            return true;
        }
        LOG.warning(getQualifiedName() + str + "received a HIGHER ver-" + i + " msg while expecting ver-" + i2 + ". Something fishy!!!");
        return false;
    }

    public void processMsg(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("CommunicationGroupDriverImpl", "processMsg", new Object[]{getQualifiedName(), groupCommunicationMessage});
        LOG.finest(getQualifiedName() + "ProcessMsg: " + groupCommunicationMessage + ". Waiting to acquire topologiesLock");
        synchronized (this.topologiesLock) {
            LOG.finest(getQualifiedName() + "Acquired topologiesLock");
            if (!isMsgVersionOk(groupCommunicationMessage)) {
                LOG.finer(getQualifiedName() + "Discarding msg. Released topologiesLock");
                return;
            }
            if (this.initializing.get() || groupCommunicationMessage.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology)) {
                LOG.fine(getQualifiedName() + groupCommunicationMessage.getSimpleOperName() + ": Waiting for all required(" + this.allTasksAdded.getInitialCount() + ") nodes to run");
                this.allTasksAdded.await();
                LOG.fine(getQualifiedName() + groupCommunicationMessage.getSimpleOperName() + ": All required(" + this.allTasksAdded.getInitialCount() + ") nodes are running");
                this.initializing.compareAndSet(true, false);
            }
            queNProcessMsg(groupCommunicationMessage);
            LOG.finest(getQualifiedName() + "Released topologiesLock");
            LOG.exiting("CommunicationGroupDriverImpl", "processMsg", Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: ", groupCommunicationMessage}));
        }
    }

    private String taskId(Configuration configuration) {
        try {
            return (String) Tang.Factory.getTang().newInjector(configuration).getNamedInstance(TaskConfigurationOptions.Identifier.class);
        } catch (InjectionException e) {
            throw new RuntimeException(getQualifiedName() + "Injection exception while extracting taskId from partialTaskConf", e);
        }
    }

    private String getQualifiedName() {
        return Utils.simpleName(this.groupName) + " - ";
    }
}
