package org.apache.reef.io.network.group.impl.driver;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.logging.Logger;
import org.apache.reef.io.network.group.api.config.OperatorSpec;
import org.apache.reef.io.network.group.api.driver.TaskNode;
import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
import org.apache.reef.io.network.group.impl.GroupChangesCodec;
import org.apache.reef.io.network.group.impl.GroupChangesImpl;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
import org.apache.reef.io.network.group.impl.operators.ReduceSender;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.impl.SingleThreadStage;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/driver/FlatTopology.class */
public class FlatTopology implements Topology {
    private static final Logger LOG = Logger.getLogger(FlatTopology.class.getName());
    private final EStage<GroupCommunicationMessage> senderStage;
    private final Class<? extends Name<String>> groupName;
    private final Class<? extends Name<String>> operName;
    private final String driverId;
    private String rootId;
    private OperatorSpec operatorSpec;
    private TaskNode root;
    private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap();

    public FlatTopology(EStage<GroupCommunicationMessage> eStage, Class<? extends Name<String>> cls, Class<? extends Name<String>> cls2, String str, int i) {
        this.senderStage = eStage;
        this.groupName = cls;
        this.operName = cls2;
        this.driverId = str;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void setRootTask(String str) {
        this.rootId = str;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public String getRootId() {
        return this.rootId;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void setOperatorSpecification(OperatorSpec operatorSpec) {
        this.operatorSpec = operatorSpec;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public Configuration getTaskConfiguration(String str) {
        LOG.finest(getQualifiedName() + "Getting config for task " + str);
        if (this.nodes.get(str) == null) {
            throw new RuntimeException(getQualifiedName() + str + " does not exist");
        }
        int nodeVersion = getNodeVersion(str);
        JavaConfigurationBuilder newConfigurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
        newConfigurationBuilder.bindNamedParameter(DataCodec.class, this.operatorSpec.getDataCodecClass());
        newConfigurationBuilder.bindNamedParameter(TaskVersion.class, Integer.toString(nodeVersion));
        if (this.operatorSpec instanceof BroadcastOperatorSpec) {
            if (str.equals(((BroadcastOperatorSpec) this.operatorSpec).getSenderId())) {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, BroadcastSender.class);
            } else {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class);
            }
        }
        if (this.operatorSpec instanceof ReduceOperatorSpec) {
            ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) this.operatorSpec;
            newConfigurationBuilder.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass());
            if (str.equals(reduceOperatorSpec.getReceiverId())) {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, ReduceReceiver.class);
            } else {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, ReduceSender.class);
            }
        }
        return newConfigurationBuilder.build();
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public int getNodeVersion(String str) {
        TaskNode taskNode = this.nodes.get(str);
        if (taskNode == null) {
            throw new RuntimeException(getQualifiedName() + str + " is not available on the nodes map");
        }
        return taskNode.getVersion();
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void removeTask(String str) {
        if (!this.nodes.containsKey(str)) {
            LOG.warning("Trying to remove a non-existent node in the task graph");
        } else if (str.equals(this.rootId)) {
            unsetRootNode(str);
        } else {
            removeChild(str);
        }
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void addTask(String str) {
        if (this.nodes.containsKey(str)) {
            LOG.warning("Got a request to add a task that is already in the graph");
            LOG.warning("We need to block this request till the delete finishes");
        }
        if (str.equals(this.rootId)) {
            setRootNode(str);
        } else {
            addChild(str);
        }
    }

    private void addChild(String str) {
        LOG.finest(getQualifiedName() + "Adding leaf " + str);
        TaskNodeImpl taskNodeImpl = new TaskNodeImpl(this.senderStage, this.groupName, this.operName, str, this.driverId, false);
        if (this.root != null) {
            taskNodeImpl.setParent(this.root);
            this.root.addChild(taskNodeImpl);
        }
        this.nodes.put(str, taskNodeImpl);
    }

    private void removeChild(String str) {
        LOG.finest(getQualifiedName() + "Removing leaf " + str);
        if (this.root != null) {
            this.root.removeChild(this.nodes.get(str));
        }
        this.nodes.remove(str);
    }

    private void setRootNode(String str) {
        LOG.finest(getQualifiedName() + "Setting " + str + " as root");
        this.root = new TaskNodeImpl(this.senderStage, this.groupName, this.operName, str, this.driverId, true);
        for (Map.Entry<String, TaskNode> entry : this.nodes.entrySet()) {
            entry.getKey();
            TaskNode value = entry.getValue();
            this.root.addChild(value);
            value.setParent(this.root);
        }
        this.nodes.put(str, this.root);
    }

    private void unsetRootNode(String str) {
        LOG.finest(getQualifiedName() + "Unsetting " + this.rootId + " as root");
        this.nodes.remove(this.rootId);
        for (Map.Entry<String, TaskNode> entry : this.nodes.entrySet()) {
            entry.getKey();
            entry.getValue().setParent(null);
        }
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void onFailedTask(String str) {
        LOG.finest(getQualifiedName() + "Task-" + str + " failed");
        TaskNode taskNode = this.nodes.get(str);
        if (taskNode == null) {
            throw new RuntimeException(getQualifiedName() + str + " does not exist");
        }
        taskNode.onFailedTask();
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void onRunningTask(String str) {
        LOG.finest(getQualifiedName() + "Task-" + str + " is running");
        TaskNode taskNode = this.nodes.get(str);
        if (taskNode == null) {
            throw new RuntimeException(getQualifiedName() + str + " does not exist");
        }
        taskNode.onRunningTask();
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void onReceiptOfMessage(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.finest(getQualifiedName() + "processing " + groupCommunicationMessage.getType() + " from " + groupCommunicationMessage.getSrcid());
        if (groupCommunicationMessage.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges)) {
            processTopologyChanges(groupCommunicationMessage);
        } else if (groupCommunicationMessage.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology)) {
            processUpdateTopology(groupCommunicationMessage);
        } else {
            this.nodes.get(groupCommunicationMessage.getSrcid()).onReceiptOfAcknowledgement(groupCommunicationMessage);
        }
    }

    /* JADX WARN: Type inference failed for: r8v2, types: [byte[], byte[][]] */
    private void processUpdateTopology(GroupCommunicationMessage groupCommunicationMessage) {
        String srcid = groupCommunicationMessage.getSrcid();
        int nodeVersion = getNodeVersion(srcid);
        LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated");
        SingleThreadStage singleThreadStage = new SingleThreadStage("NodeTopologyUpdateWaitStage", new TopologyUpdateWaitHandler(this.senderStage, this.groupName, this.operName, this.driverId, 0, srcid, nodeVersion, getQualifiedName()), this.nodes.size());
        ArrayList<TaskNode> arrayList = new ArrayList(this.nodes.size());
        LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
        for (TaskNode taskNode : this.nodes.values()) {
            if (taskNode.isRunning() && taskNode.hasChanges() && taskNode.resetTopologySetupSent()) {
                arrayList.add(taskNode);
            }
        }
        for (TaskNode taskNode2 : arrayList) {
            taskNode2.updatingTopology();
            this.senderStage.onNext(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, this.driverId, 0, taskNode2.getTaskId(), taskNode2.getVersion(), new byte[]{Utils.EmptyByteArr}));
        }
        singleThreadStage.onNext(arrayList);
    }

    /* JADX WARN: Type inference failed for: r8v2, types: [byte[], byte[][]] */
    private void processTopologyChanges(GroupCommunicationMessage groupCommunicationMessage) {
        String srcid = groupCommunicationMessage.getSrcid();
        boolean z = false;
        LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
        for (TaskNode taskNode : this.nodes.values()) {
            if (!taskNode.isRunning() || taskNode.hasChanges()) {
                z = true;
                break;
            }
        }
        this.senderStage.onNext(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, this.driverId, 0, srcid, getNodeVersion(srcid), new byte[]{new GroupChangesCodec().encode(new GroupChangesImpl(z))}));
    }

    private String getQualifiedName() {
        return Utils.simpleName(this.groupName) + ":" + Utils.simpleName(this.operName) + " - ";
    }
}
