package org.apache.reef.io.network.group.impl.task;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
import org.apache.reef.io.network.group.api.operators.Reduce;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.operators.Sender;
import org.apache.reef.io.network.group.impl.utils.ResettingCountDownLatch;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.io.serialization.Codec;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.SingleThreadStage;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.class */
public class OperatorTopologyImpl implements OperatorTopology {
    private static final Logger LOG;
    private final Class<? extends Name<String>> groupName;
    private final Class<? extends Name<String>> operName;
    private final String selfId;
    private final String driverId;
    private final Sender sender;
    private final int version;
    private OperatorTopologyStruct baseTopology;
    private OperatorTopologyStruct effectiveTopology;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object topologyLock = new Object();
    private final BlockingQueue<GroupCommunicationMessage> deltas = new LinkedBlockingQueue();
    private final BlockingQueue<GroupCommunicationMessage> deletionDeltas = new LinkedBlockingQueue();
    private final ResettingCountDownLatch topologyLockAquired = new ResettingCountDownLatch(1);
    private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
    private final EventHandler<GroupCommunicationMessage> baseTopologyUpdateHandler = new BaseTopologyUpdateHandler();
    private final EStage<GroupCommunicationMessage> baseTopologyUpdateStage = new SingleThreadStage("BaseTopologyUpdateStage", this.baseTopologyUpdateHandler, 5);
    private final EventHandler<GroupCommunicationMessage> dataHandlingStageHandler = new DataHandlingStageHandler();
    private final EStage<GroupCommunicationMessage> dataHandlingStage = new SingleThreadStage("DataHandlingStage", this.dataHandlingStageHandler, 10000);

    /* loaded from: input_file:org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl$BaseTopologyUpdateHandler.class */
    private final class BaseTopologyUpdateHandler implements EventHandler<GroupCommunicationMessage> {
        static final /* synthetic */ boolean $assertionsDisabled;

        private BaseTopologyUpdateHandler() {
        }

        public void onNext(GroupCommunicationMessage groupCommunicationMessage) {
            if (!$assertionsDisabled && groupCommunicationMessage.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && OperatorTopologyImpl.this.effectiveTopology == null) {
                throw new AssertionError();
            }
            OperatorTopologyImpl.LOG.entering("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", new Object[]{OperatorTopologyImpl.this.getQualifiedName(), groupCommunicationMessage});
            OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Waiting to acquire topoLock");
            synchronized (OperatorTopologyImpl.this.topologyLock) {
                OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Acquired topoLock");
                OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Releasing topoLoackAcquired CDL");
                OperatorTopologyImpl.this.topologyLockAquired.countDown();
                try {
                    OperatorTopologyImpl.this.updateBaseTopology();
                    OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Completed updating base & effective topologies");
                    OperatorTopologyImpl.this.updatingTopo.set(false);
                    OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Topology update complete. Notifying waiting threads");
                    OperatorTopologyImpl.this.topologyLock.notifyAll();
                    OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Released topoLock");
                } catch (ParentDeadException e) {
                    throw new RuntimeException(OperatorTopologyImpl.this.getQualifiedName() + "BaseTopologyUpdateStage: Unexpected ParentDeadException", e);
                }
            }
            OperatorTopologyImpl.LOG.exiting("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", Arrays.toString(new Object[]{OperatorTopologyImpl.this.getQualifiedName(), groupCommunicationMessage}));
        }

        static {
            $assertionsDisabled = !OperatorTopologyImpl.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl$DataHandlingStageHandler.class */
    private final class DataHandlingStageHandler implements EventHandler<GroupCommunicationMessage> {
        private DataHandlingStageHandler() {
        }

        public void onNext(GroupCommunicationMessage groupCommunicationMessage) {
            OperatorTopologyImpl.LOG.entering("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", new Object[]{OperatorTopologyImpl.this.getQualifiedName(), groupCommunicationMessage});
            OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Waiting to acquire topoLock");
            synchronized (OperatorTopologyImpl.this.topologyLock) {
                OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Aqcuired topoLock");
                while (OperatorTopologyImpl.this.updatingTopo.get()) {
                    try {
                        OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Topology is being updated. Released topoLock, Waiting on topoLock");
                        OperatorTopologyImpl.this.topologyLock.wait();
                        OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Aqcuired topoLock");
                    } catch (InterruptedException e) {
                        throw new RuntimeException("InterruptedException while data handlingstage was waiting for updatingTopo to become false", e);
                    }
                }
                if (OperatorTopologyImpl.this.effectiveTopology != null) {
                    OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Non-null effectiveTopo.addAsData(msg)");
                    OperatorTopologyImpl.this.effectiveTopology.addAsData(groupCommunicationMessage);
                } else {
                    OperatorTopologyImpl.LOG.fine("Received a data message before effective topology was setup");
                }
                OperatorTopologyImpl.LOG.finest(OperatorTopologyImpl.this.getQualifiedName() + "Released topoLock");
            }
            OperatorTopologyImpl.LOG.exiting("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", Arrays.toString(new Object[]{OperatorTopologyImpl.this.getQualifiedName(), groupCommunicationMessage}));
        }
    }

    @Inject
    public OperatorTopologyImpl(Class<? extends Name<String>> cls, Class<? extends Name<String>> cls2, String str, String str2, Sender sender, int i) {
        this.groupName = cls;
        this.operName = cls2;
        this.selfId = str;
        this.driverId = str2;
        this.sender = sender;
        this.version = i;
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public void handle(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("OperatorTopologyImpl", "handle", new Object[]{getQualifiedName(), groupCommunicationMessage});
        if (isMsgVersionOk(groupCommunicationMessage)) {
            try {
                switch (groupCommunicationMessage.getType()) {
                    case UpdateTopology:
                        this.updatingTopo.set(true);
                        this.baseTopologyUpdateStage.onNext(groupCommunicationMessage);
                        this.topologyLockAquired.awaitAndReset(1);
                        LOG.finest(getQualifiedName() + "topoLockAquired CDL released. Resetting it to new CDL");
                        sendAckToDriver(groupCommunicationMessage);
                        break;
                    case TopologySetup:
                        LOG.finest(getQualifiedName() + "Adding to deltas queue");
                        this.deltas.put(groupCommunicationMessage);
                        break;
                    case ParentAdd:
                    case ChildAdd:
                        LOG.finest(getQualifiedName() + "Adding to deltas queue");
                        this.deltas.put(groupCommunicationMessage);
                        break;
                    case ParentDead:
                    case ChildDead:
                        LOG.finest(getQualifiedName() + "Adding to deltas queue");
                        this.deltas.put(groupCommunicationMessage);
                        LOG.finest(getQualifiedName() + "Adding to deletionDeltas queue");
                        this.deletionDeltas.put(groupCommunicationMessage);
                        if (this.effectiveTopology == null) {
                            LOG.fine(getQualifiedName() + "Received a death message before effective topology was setup. CAUTION");
                            break;
                        } else {
                            LOG.finest(getQualifiedName() + "Adding as data msg to non-null effective topology struct");
                            this.effectiveTopology.addAsData(groupCommunicationMessage);
                            break;
                        }
                    default:
                        this.dataHandlingStage.onNext(groupCommunicationMessage);
                        break;
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("InterruptedException while trying to put ctrl msg into delta queue", e);
            }
        }
        LOG.exiting("OperatorTopologyImpl", "handle", Arrays.toString(new Object[]{getQualifiedName(), groupCommunicationMessage}));
    }

    private boolean isMsgVersionOk(GroupCommunicationMessage groupCommunicationMessage) {
        boolean z;
        LOG.entering("OperatorTopologyImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), groupCommunicationMessage});
        if (!groupCommunicationMessage.hasVersion()) {
            throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs");
        }
        int version = groupCommunicationMessage.getVersion();
        if (version < this.version) {
            LOG.warning(getQualifiedName() + "Received a ver-" + version + " msg while expecting ver-" + this.version + ". Discarding msg");
            z = false;
        } else {
            z = true;
        }
        LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", Arrays.toString(new Object[]{Boolean.valueOf(z), getQualifiedName(), groupCommunicationMessage}));
        return z;
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public void initialize() throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "initialize", getQualifiedName());
        createBaseTopology();
        LOG.exiting("OperatorTopologyImpl", "initialize", getQualifiedName());
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public void sendToParent(byte[] bArr, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "sendToParent", new Object[]{getQualifiedName(), bArr, type});
        refreshEffectiveTopology();
        if (!$assertionsDisabled && this.effectiveTopology == null) {
            throw new AssertionError();
        }
        this.effectiveTopology.sendToParent(bArr, type);
        LOG.exiting("OperatorTopologyImpl", "sendToParent", Arrays.toString(new Object[]{getQualifiedName(), bArr, type}));
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public void sendToChildren(byte[] bArr, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), bArr, type});
        refreshEffectiveTopology();
        if (!$assertionsDisabled && this.effectiveTopology == null) {
            throw new AssertionError();
        }
        this.effectiveTopology.sendToChildren(bArr, type);
        LOG.exiting("OperatorTopologyImpl", "sendToChildren", Arrays.toString(new Object[]{getQualifiedName(), bArr, type}));
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public void sendToChildren(Map<String, byte[]> map, ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), map, type});
        refreshEffectiveTopology();
        if (!$assertionsDisabled && this.effectiveTopology == null) {
            throw new AssertionError();
        }
        this.effectiveTopology.sendToChildren(map, type);
        LOG.exiting("OperatorTopologyImpl", "sendToChildren", Arrays.toString(new Object[]{getQualifiedName(), map, type}));
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public byte[] recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type type) throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "recvFromParent", getQualifiedName());
        refreshEffectiveTopology();
        if (!$assertionsDisabled && this.effectiveTopology == null) {
            throw new AssertionError();
        }
        byte[] recvFromParent = this.effectiveTopology.recvFromParent(type);
        LOG.exiting("OperatorTopologyImpl", "recvFromParent", Arrays.toString(new Object[]{getQualifiedName(), recvFromParent}));
        return recvFromParent;
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public <T> T recvFromChildren(Reduce.ReduceFunction<T> reduceFunction, Codec<T> codec) throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName());
        refreshEffectiveTopology();
        if (!$assertionsDisabled && this.effectiveTopology == null) {
            throw new AssertionError();
        }
        T t = (T) this.effectiveTopology.recvFromChildren(reduceFunction, codec);
        LOG.exiting("OperatorTopologyImpl", "recvFromChildren", Arrays.toString(new Object[]{getQualifiedName(), t}));
        return t;
    }

    @Override // org.apache.reef.io.network.group.api.task.OperatorTopology
    public byte[] recvFromChildren() throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName());
        refreshEffectiveTopology();
        if (!$assertionsDisabled && this.effectiveTopology == null) {
            throw new AssertionError();
        }
        byte[] recvFromChildren = this.effectiveTopology.recvFromChildren();
        LOG.exiting("OperatorTopologyImpl", "recvFromChildren", Arrays.toString(new Object[]{getQualifiedName(), recvFromChildren}));
        return recvFromChildren;
    }

    private void refreshEffectiveTopology() throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName());
        LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
        synchronized (this.topologyLock) {
            LOG.finest(getQualifiedName() + "Acquired topoLock");
            if (!$assertionsDisabled && this.effectiveTopology == null) {
                throw new AssertionError();
            }
            HashSet hashSet = new HashSet();
            copyDeletionDeltas(hashSet);
            LOG.finest(getQualifiedName() + "Updating effective topology struct with deletion msgs");
            this.effectiveTopology.update(hashSet);
            LOG.finest(getQualifiedName() + "Released topoLock");
        }
        LOG.exiting("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName());
    }

    private void createBaseTopology() throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "createBaseTopology", getQualifiedName());
        this.baseTopology = new OperatorTopologyStructImpl(this.groupName, this.operName, this.selfId, this.driverId, this.sender, this.version);
        updateBaseTopology();
        LOG.exiting("OperatorTopologyImpl", "createBaseTopology", getQualifiedName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateBaseTopology() throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName());
        LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
        synchronized (this.topologyLock) {
            LOG.finest(getQualifiedName() + "Acquired topoLock");
            try {
                if (!$assertionsDisabled && this.baseTopology == null) {
                    throw new AssertionError();
                }
                LOG.finest(getQualifiedName() + "Updating base topology. So setting dirty bit");
                this.baseTopology.setChanges(true);
                LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
                GroupCommunicationMessage take = this.deltas.take();
                while (take.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup) {
                    LOG.finest(getQualifiedName() + "Got " + take.getType() + " msg from " + take.getSrcid());
                    if (this.effectiveTopology == null && take.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
                        LOG.finer(getQualifiedName() + "Throwing ParentDeadException");
                        throw new ParentDeadException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too.");
                    }
                    LOG.finest(getQualifiedName() + "Updating basetopology struct");
                    this.baseTopology.update(take);
                    sendAckToDriver(take);
                    LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
                    take = this.deltas.take();
                }
                updateEffTopologyFromBaseTopology();
                LOG.finest(getQualifiedName() + "Released topoLock");
            } catch (InterruptedException e) {
                throw new RuntimeException("InterruptedException while waiting for delta msg from driver", e);
            }
        }
        LOG.exiting("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName());
    }

    /* JADX WARN: Type inference failed for: r8v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r8v3, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r8v5, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r8v7, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r8v9, types: [byte[], byte[][]] */
    private void sendAckToDriver(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("OperatorTopologyImpl", "sendAckToDriver", new Object[]{getQualifiedName(), groupCommunicationMessage});
        try {
            String srcid = groupCommunicationMessage.getSrcid();
            if (!groupCommunicationMessage.hasVersion()) {
                throw new RuntimeException(getQualifiedName() + "Ack Sender can only deal with versioned msgs");
            }
            int srcVersion = groupCommunicationMessage.getSrcVersion();
            switch (groupCommunicationMessage.getType()) {
                case UpdateTopology:
                    this.sender.send(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, this.selfId, this.version, this.driverId, srcVersion, new byte[]{Utils.EMPTY_BYTE_ARR}));
                    break;
                case TopologySetup:
                default:
                    throw new RuntimeException("Received a non control message for acknowledgement");
                case ParentAdd:
                    this.sender.send(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, this.selfId, this.version, srcid, srcVersion, new byte[]{Utils.EMPTY_BYTE_ARR}), this.driverId);
                    break;
                case ChildAdd:
                    this.sender.send(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, this.selfId, this.version, srcid, srcVersion, new byte[]{Utils.EMPTY_BYTE_ARR}), this.driverId);
                    break;
                case ParentDead:
                    this.sender.send(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, this.selfId, this.version, srcid, srcVersion, new byte[]{Utils.EMPTY_BYTE_ARR}), this.driverId);
                    break;
                case ChildDead:
                    this.sender.send(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, this.selfId, this.version, srcid, srcVersion, new byte[]{Utils.EMPTY_BYTE_ARR}), this.driverId);
                    break;
            }
            LOG.exiting("OperatorTopologyImpl", "sendAckToDriver", Arrays.toString(new Object[]{getQualifiedName(), groupCommunicationMessage}));
        } catch (NetworkException e) {
            throw new RuntimeException("NetworkException while sending ack to driver for delta msg " + groupCommunicationMessage.getType(), e);
        }
    }

    private void updateEffTopologyFromBaseTopology() {
        LOG.entering("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName());
        if (!$assertionsDisabled && this.baseTopology == null) {
            throw new AssertionError();
        }
        LOG.finest(getQualifiedName() + "Updaing effective topology");
        if (this.baseTopology.hasChanges()) {
            this.effectiveTopology = new OperatorTopologyStructImpl(this.baseTopology);
            this.baseTopology.setChanges(false);
        }
        LOG.exiting("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName());
    }

    private void copyDeletionDeltas(Set<GroupCommunicationMessage> set) throws ParentDeadException {
        LOG.entering("OperatorTopologyImpl", "copyDeletionDeltas", new Object[]{getQualifiedName(), set});
        this.deletionDeltas.drainTo(set);
        Iterator<GroupCommunicationMessage> it = set.iterator();
        while (it.hasNext()) {
            if (it.next().getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
                throw new ParentDeadException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too.");
            }
        }
        LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", Arrays.toString(new Object[]{getQualifiedName(), set}));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getQualifiedName() {
        return Utils.simpleName(this.groupName) + ":" + Utils.simpleName(this.operName) + ":" + this.selfId + ":ver(" + this.version + ") - ";
    }

    static {
        $assertionsDisabled = !OperatorTopologyImpl.class.desiredAssertionStatus();
        LOG = Logger.getLogger(OperatorTopologyImpl.class.getName());
    }
}
