package org.apache.reef.io.network.group.impl.task;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.group.api.GroupChanges;
import org.apache.reef.io.network.group.api.operators.Broadcast;
import org.apache.reef.io.network.group.api.operators.Gather;
import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
import org.apache.reef.io.network.group.api.operators.Reduce;
import org.apache.reef.io.network.group.api.operators.Scatter;
import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
import org.apache.reef.io.network.group.impl.GroupChangesCodec;
import org.apache.reef.io.network.group.impl.GroupChangesImpl;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
import org.apache.reef.io.network.group.impl.config.parameters.DriverIdentifierGroupComm;
import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
import org.apache.reef.io.network.group.impl.config.parameters.SerializedOperConfigs;
import org.apache.reef.io.network.group.impl.driver.TopologySerializer;
import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
import org.apache.reef.io.network.group.impl.operators.Sender;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.io.network.util.Pair;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.impl.ThreadPoolStage;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.class */
public class CommunicationGroupClientImpl implements CommunicationGroupServiceClient {
    private static final Logger LOG = Logger.getLogger(CommunicationGroupClientImpl.class.getName());
    private final GroupCommNetworkHandler groupCommNetworkHandler;
    private final Class<? extends Name<String>> groupName;
    private final Map<Class<? extends Name<String>>, GroupCommOperator> operators;
    private final Sender sender;
    private final String taskId;
    private final boolean isScatterSender;
    private final IdentifierFactory identifierFactory;
    private List<Identifier> activeSlaveTasks;
    private TopologySimpleNode topologySimpleNodeRoot;
    private final String driverId;
    private final CommGroupNetworkHandler commGroupNetworkHandler;
    private final AtomicBoolean init = new AtomicBoolean(false);

    @Inject
    @Deprecated
    public CommunicationGroupClientImpl(@Parameter(CommunicationGroupName.class) String str, @Parameter(TaskConfigurationOptions.Identifier.class) String str2, @Parameter(DriverIdentifierGroupComm.class) String str3, GroupCommNetworkHandler groupCommNetworkHandler, @Parameter(SerializedOperConfigs.class) Set<String> set, ConfigurationSerializer configurationSerializer, NetworkService<GroupCommunicationMessage> networkService) {
        this.taskId = str2;
        this.driverId = str3;
        LOG.finest(str + " has GroupCommHandler-" + groupCommNetworkHandler.toString());
        this.identifierFactory = networkService.getIdentifierFactory();
        this.groupName = Utils.getClass(str);
        this.groupCommNetworkHandler = groupCommNetworkHandler;
        this.sender = new Sender(networkService);
        this.operators = new TreeMap(new Comparator<Class<? extends Name<String>>>() { // from class: org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl.1
            @Override // java.util.Comparator
            public int compare(Class<? extends Name<String>> cls, Class<? extends Name<String>> cls2) {
                return cls.getSimpleName().compareTo(cls2.getSimpleName());
            }
        });
        try {
            this.commGroupNetworkHandler = (CommGroupNetworkHandler) Tang.Factory.getTang().newInjector().getInstance(CommGroupNetworkHandler.class);
            this.groupCommNetworkHandler.register(this.groupName, this.commGroupNetworkHandler);
            boolean z = false;
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                Injector newInjector = Tang.Factory.getTang().newInjector(configurationSerializer.fromString(it.next()));
                newInjector.bindVolatileParameter(TaskConfigurationOptions.Identifier.class, str2);
                newInjector.bindVolatileParameter(CommunicationGroupName.class, str);
                newInjector.bindVolatileInstance(CommGroupNetworkHandler.class, this.commGroupNetworkHandler);
                newInjector.bindVolatileInstance(NetworkService.class, networkService);
                newInjector.bindVolatileInstance(CommunicationGroupServiceClient.class, this);
                GroupCommOperator groupCommOperator = (GroupCommOperator) newInjector.getInstance(GroupCommOperator.class);
                String str4 = (String) newInjector.getNamedInstance(OperatorName.class);
                this.operators.put(Utils.getClass(str4), groupCommOperator);
                LOG.finest(str4 + " has CommGroupHandler-" + this.commGroupNetworkHandler.toString());
                if (!z && (groupCommOperator instanceof Scatter.Sender)) {
                    LOG.fine(str4 + " is a scatter sender. Will keep track of active slave tasks.");
                    z = true;
                }
            }
            this.isScatterSender = z;
        } catch (InjectionException | IOException e) {
            throw new RuntimeException("Unable to deserialize operator config", e);
        }
    }

    @Inject
    private CommunicationGroupClientImpl(@Parameter(CommunicationGroupName.class) String str, @Parameter(TaskConfigurationOptions.Identifier.class) String str2, @Parameter(DriverIdentifierGroupComm.class) String str3, GroupCommNetworkHandler groupCommNetworkHandler, @Parameter(SerializedOperConfigs.class) Set<String> set, ConfigurationSerializer configurationSerializer, NetworkService<GroupCommunicationMessage> networkService, CommGroupNetworkHandler commGroupNetworkHandler, Injector injector) {
        this.taskId = str2;
        this.driverId = str3;
        LOG.finest(str + " has GroupCommHandler-" + groupCommNetworkHandler.toString());
        this.identifierFactory = networkService.getIdentifierFactory();
        this.groupName = Utils.getClass(str);
        this.groupCommNetworkHandler = groupCommNetworkHandler;
        this.commGroupNetworkHandler = commGroupNetworkHandler;
        this.sender = new Sender(networkService);
        this.operators = new TreeMap(new Comparator<Class<? extends Name<String>>>() { // from class: org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl.2
            @Override // java.util.Comparator
            public int compare(Class<? extends Name<String>> cls, Class<? extends Name<String>> cls2) {
                return cls.getSimpleName().compareTo(cls2.getSimpleName());
            }
        });
        try {
            this.groupCommNetworkHandler.register(this.groupName, commGroupNetworkHandler);
            boolean z = false;
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                Injector forkInjector = injector.forkInjector(new Configuration[]{configurationSerializer.fromString(it.next())});
                forkInjector.bindVolatileInstance(CommunicationGroupServiceClient.class, this);
                GroupCommOperator groupCommOperator = (GroupCommOperator) forkInjector.getInstance(GroupCommOperator.class);
                String str4 = (String) forkInjector.getNamedInstance(OperatorName.class);
                this.operators.put(Utils.getClass(str4), groupCommOperator);
                LOG.finest(str4 + " has CommGroupHandler-" + commGroupNetworkHandler.toString());
                if (!z && (groupCommOperator instanceof Scatter.Sender)) {
                    LOG.fine(str4 + " is a scatter sender. Will keep track of active slave tasks.");
                    z = true;
                }
            }
            this.isScatterSender = z;
        } catch (InjectionException | IOException e) {
            throw new RuntimeException("Unable to deserialize operator config", e);
        }
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Broadcast.Sender getBroadcastSender(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getBroadcastSender", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Broadcast.Sender)) {
            throw new RuntimeException("Configured operator is not a broadcast sender");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getBroadcastSender", getQualifiedName() + groupCommOperator);
        return (Broadcast.Sender) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Reduce.Receiver getReduceReceiver(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getReduceReceiver", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Reduce.Receiver)) {
            throw new RuntimeException("Configured operator is not a reduce receiver");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getReduceReceiver", getQualifiedName() + groupCommOperator);
        return (Reduce.Receiver) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Scatter.Sender getScatterSender(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getScatterSender", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Scatter.Sender)) {
            throw new RuntimeException("Configured operator is not a scatter sender");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getScatterSender", getQualifiedName() + groupCommOperator);
        return (Scatter.Sender) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Gather.Receiver getGatherReceiver(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getGatherReceiver", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Gather.Receiver)) {
            throw new RuntimeException("Configured operator is not a gather receiver");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getGatherReceiver", getQualifiedName() + groupCommOperator);
        return (Gather.Receiver) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Broadcast.Receiver getBroadcastReceiver(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getBroadcastReceiver", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Broadcast.Receiver)) {
            throw new RuntimeException("Configured operator is not a broadcast receiver");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getBroadcastReceiver", getQualifiedName() + groupCommOperator);
        return (Broadcast.Receiver) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Reduce.Sender getReduceSender(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getReduceSender", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Reduce.Sender)) {
            throw new RuntimeException("Configured operator is not a reduce sender");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getReduceSender", getQualifiedName() + groupCommOperator);
        return (Reduce.Sender) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Scatter.Receiver getScatterReceiver(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getScatterReceiver", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Scatter.Receiver)) {
            throw new RuntimeException("Configured operator is not a scatter receiver");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getScatterReceiver", getQualifiedName() + groupCommOperator);
        return (Scatter.Receiver) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Gather.Sender getGatherSender(Class<? extends Name<String>> cls) {
        LOG.entering("CommunicationGroupClientImpl", "getGatherSender", new Object[]{getQualifiedName(), Utils.simpleName(cls)});
        GroupCommOperator groupCommOperator = this.operators.get(cls);
        if (!(groupCommOperator instanceof Gather.Sender)) {
            throw new RuntimeException("Configured operator is not a gather sender");
        }
        this.commGroupNetworkHandler.addTopologyElement(cls);
        LOG.exiting("CommunicationGroupClientImpl", "getGatherSender", getQualifiedName() + groupCommOperator);
        return (Gather.Sender) groupCommOperator;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient
    public void initialize() {
        LOG.entering("CommunicationGroupClientImpl", "initialize", getQualifiedName());
        if (this.init.compareAndSet(false, true)) {
            LOG.finest("CommGroup-" + this.groupName + " is initializing");
            CountDownLatch countDownLatch = new CountDownLatch(this.operators.size());
            InitHandler initHandler = new InitHandler(countDownLatch);
            ThreadPoolStage threadPoolStage = new ThreadPoolStage(initHandler, this.operators.size());
            Iterator<GroupCommOperator> it = this.operators.values().iterator();
            while (it.hasNext()) {
                threadPoolStage.onNext(it.next());
            }
            try {
                countDownLatch.await();
                if (this.isScatterSender) {
                    updateTopology();
                }
                if (initHandler.getException() != null) {
                    throw new RuntimeException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too.");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("InterruptedException while waiting for initialization", e);
            }
        }
        LOG.exiting("CommunicationGroupClientImpl", "initialize", getQualifiedName());
    }

    /* JADX WARN: Type inference failed for: r8v1, types: [byte[], byte[][]] */
    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public GroupChanges getTopologyChanges() {
        LOG.entering("CommunicationGroupClientImpl", "getTopologyChanges", getQualifiedName());
        for (GroupCommOperator groupCommOperator : this.operators.values()) {
            Class<? extends Name<String>> operName = groupCommOperator.getOperName();
            LOG.finest("Sending TopologyChanges msg to driver");
            try {
                this.sender.send(Utils.bldVersionedGCM(this.groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, this.taskId, groupCommOperator.getVersion(), this.driverId, 0, new byte[]{Utils.EMPTY_BYTE_ARR}));
            } catch (NetworkException e) {
                throw new RuntimeException("NetworkException while sending GetTopologyChanges", e);
            }
        }
        GroupChangesCodec groupChangesCodec = new GroupChangesCodec();
        HashMap hashMap = new HashMap();
        Iterator<GroupCommOperator> it = this.operators.values().iterator();
        while (it.hasNext()) {
            Class<? extends Name<String>> operName2 = it.next().getOperName();
            hashMap.put(operName2, groupChangesCodec.decode(this.commGroupNetworkHandler.waitForTopologyChanges(operName2)));
        }
        GroupChanges mergeGroupChanges = mergeGroupChanges(hashMap);
        LOG.exiting("CommunicationGroupClientImpl", "getTopologyChanges", getQualifiedName() + mergeGroupChanges);
        return mergeGroupChanges;
    }

    private GroupChanges mergeGroupChanges(Map<Class<? extends Name<String>>, GroupChanges> map) {
        LOG.entering("CommunicationGroupClientImpl", "mergeGroupChanges", new Object[]{getQualifiedName(), map});
        boolean z = false;
        Iterator<GroupChanges> it = map.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().exist()) {
                z = true;
                break;
            }
        }
        GroupChangesImpl groupChangesImpl = new GroupChangesImpl(z);
        LOG.exiting("CommunicationGroupClientImpl", "mergeGroupChanges", getQualifiedName() + groupChangesImpl);
        return groupChangesImpl;
    }

    /* JADX WARN: Type inference failed for: r8v1, types: [byte[], byte[][]] */
    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public void updateTopology() {
        GroupCommunicationMessage waitForTopologyUpdate;
        LOG.entering("CommunicationGroupClientImpl", "updateTopology", getQualifiedName());
        for (GroupCommOperator groupCommOperator : this.operators.values()) {
            try {
                this.sender.send(Utils.bldVersionedGCM(this.groupName, groupCommOperator.getOperName(), ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, this.taskId, groupCommOperator.getVersion(), this.driverId, 0, new byte[]{Utils.EMPTY_BYTE_ARR}));
            } catch (NetworkException e) {
                throw new RuntimeException("NetworkException while sending UpdateTopology", e);
            }
        }
        Iterator<GroupCommOperator> it = this.operators.values().iterator();
        while (it.hasNext()) {
            Class<? extends Name<String>> operName = it.next().getOperName();
            do {
                waitForTopologyUpdate = this.commGroupNetworkHandler.waitForTopologyUpdate(operName);
            } while (!isMsgVersionOk(waitForTopologyUpdate));
            if (this.isScatterSender) {
                updateActiveTasks(waitForTopologyUpdate);
            }
        }
        LOG.exiting("CommunicationGroupClientImpl", "updateTopology", getQualifiedName());
    }

    private void updateActiveTasks(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("CommunicationGroupClientImpl", "updateActiveTasks", new Object[]{getQualifiedName(), groupCommunicationMessage});
        Pair<TopologySimpleNode, List<Identifier>> decode = TopologySerializer.decode(groupCommunicationMessage.getData()[0], this.identifierFactory);
        this.topologySimpleNodeRoot = decode.getFirst();
        this.activeSlaveTasks = decode.getSecond();
        this.activeSlaveTasks.remove(this.identifierFactory.getNewInstance(this.taskId));
        Collections.sort(this.activeSlaveTasks, new Comparator<Identifier>() { // from class: org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl.3
            @Override // java.util.Comparator
            public int compare(Identifier identifier, Identifier identifier2) {
                return identifier.toString().compareTo(identifier2.toString());
            }
        });
        LOG.exiting("CommunicationGroupClientImpl", "updateActiveTasks", new Object[]{getQualifiedName(), groupCommunicationMessage});
    }

    private boolean isMsgVersionOk(GroupCommunicationMessage groupCommunicationMessage) {
        boolean z;
        LOG.entering("CommunicationGroupClientImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), groupCommunicationMessage});
        if (!groupCommunicationMessage.hasVersion()) {
            throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs");
        }
        int version = groupCommunicationMessage.getVersion();
        int version2 = this.operators.get(Utils.getClass(groupCommunicationMessage.getOperatorname())).getVersion();
        if (version < version2) {
            LOG.warning(getQualifiedName() + "Received a ver-" + version + " msg while expecting ver-" + version2 + ". Discarding msg");
            z = false;
        } else {
            z = true;
        }
        LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk", Arrays.toString(new Object[]{Boolean.valueOf(z), getQualifiedName(), groupCommunicationMessage}));
        return z;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public List<Identifier> getActiveSlaveTasks() {
        return this.activeSlaveTasks;
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public TopologySimpleNode getTopologySimpleNodeRoot() {
        return this.topologySimpleNodeRoot;
    }

    private String getQualifiedName() {
        return Utils.simpleName(this.groupName) + " ";
    }

    @Override // org.apache.reef.io.network.group.api.task.CommunicationGroupClient
    public Class<? extends Name<String>> getName() {
        return this.groupName;
    }
}
