package org.apache.reef.io.network.group.impl.operators;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
import org.apache.reef.io.network.group.api.operators.Reduce;
import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
import org.apache.reef.io.network.group.impl.config.parameters.DriverIdentifierGroupComm;
import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.io.serialization.Codec;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/operators/ReduceSender.class */
public class ReduceSender<T> implements Reduce.Sender<T>, EventHandler<GroupCommunicationMessage> {
    private static final Logger LOG = Logger.getLogger(ReduceSender.class.getName());
    private final Class<? extends Name<String>> groupName;
    private final Class<? extends Name<String>> operName;
    private final CommGroupNetworkHandler commGroupNetworkHandler;
    private final Codec<T> dataCodec;
    private final NetworkService<GroupCommunicationMessage> netService;
    private final Sender sender;
    private final Reduce.ReduceFunction<T> reduceFunction;
    private final OperatorTopology topology;
    private final CommunicationGroupServiceClient commGroupClient;
    private final AtomicBoolean init = new AtomicBoolean(false);
    private final int version;

    @Inject
    public ReduceSender(@Parameter(CommunicationGroupName.class) String str, @Parameter(OperatorName.class) String str2, @Parameter(TaskConfigurationOptions.Identifier.class) String str3, @Parameter(DataCodec.class) Codec<T> codec, @Parameter(ReduceFunctionParam.class) Reduce.ReduceFunction<T> reduceFunction, @Parameter(DriverIdentifierGroupComm.class) String str4, @Parameter(TaskVersion.class) int i, CommGroupNetworkHandler commGroupNetworkHandler, NetworkService<GroupCommunicationMessage> networkService, CommunicationGroupServiceClient communicationGroupServiceClient) {
        LOG.log(Level.FINEST, "{0} has CommGroupHandler-{1}", new Object[]{str2, commGroupNetworkHandler});
        this.version = i;
        this.groupName = Utils.getClass(str);
        this.operName = Utils.getClass(str2);
        this.dataCodec = codec;
        this.reduceFunction = reduceFunction;
        this.commGroupNetworkHandler = commGroupNetworkHandler;
        this.netService = networkService;
        this.sender = new Sender(this.netService);
        this.topology = new OperatorTopologyImpl(this.groupName, this.operName, str3, str4, this.sender, i);
        this.commGroupNetworkHandler.register(this.operName, this);
        this.commGroupClient = communicationGroupServiceClient;
    }

    @Override // org.apache.reef.io.network.group.api.operators.GroupCommOperator
    public int getVersion() {
        return this.version;
    }

    @Override // org.apache.reef.io.network.group.api.operators.GroupCommOperator
    public void initialize() throws ParentDeadException {
        this.topology.initialize();
    }

    @Override // org.apache.reef.io.network.group.api.operators.GroupCommOperator
    public Class<? extends Name<String>> getOperName() {
        return this.operName;
    }

    @Override // org.apache.reef.io.network.group.api.operators.GroupCommOperator
    public Class<? extends Name<String>> getGroupName() {
        return this.groupName;
    }

    public String toString() {
        return Utils.simpleName(this.groupName) + ":" + Utils.simpleName(this.operName) + ":" + this.version;
    }

    public void onNext(GroupCommunicationMessage groupCommunicationMessage) {
        this.topology.handle(groupCommunicationMessage);
    }

    @Override // org.apache.reef.io.network.group.api.operators.Reduce.Sender
    public void send(T t) throws NetworkException, InterruptedException {
        LOG.entering("ReduceSender", "send", this);
        LOG.fine("I am " + this);
        if (this.init.compareAndSet(false, true)) {
            this.commGroupClient.initialize();
        }
        LOG.finest("Waiting for children");
        try {
            Object recvFromChildren = this.topology.recvFromChildren(this.reduceFunction, this.dataCodec);
            ArrayList arrayList = new ArrayList(2);
            arrayList.add(t);
            if (recvFromChildren != null) {
                arrayList.add(recvFromChildren);
            }
            this.topology.sendToParent(this.dataCodec.encode(this.reduceFunction.apply(arrayList)), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce);
            LOG.exiting("ReduceSender", "send", this);
        } catch (ParentDeadException e) {
            throw new RuntimeException("ParentDeadException", e);
        }
    }

    @Override // org.apache.reef.io.network.group.api.operators.Reduce.Sender
    public Reduce.ReduceFunction<T> getReduceFunction() {
        return this.reduceFunction;
    }
}
