package org.apache.hama.bsp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorDescriptor;
import org.apache.hama.Constants;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/hama/bsp/BSPPeer.class */
public class BSPPeer implements Watcher, BSPPeerInterface {
    public static final Log LOG = LogFactory.getLog(BSPPeer.class);
    private Configuration conf;
    private BSPJob jobConf;
    private final String bspRoot;
    private final String zookeeperAddr;
    private InetSocketAddress peerAddress;
    private TaskStatus currentTaskStatus;
    private RPC.Server server = null;
    private ZooKeeper zk = null;
    private volatile Integer mutex = 0;
    private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap();
    private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<>();
    private SortedSet<String> allPeerNames = new TreeSet();

    public BSPPeer(Configuration configuration) throws IOException {
        this.conf = configuration;
        String str = configuration.get(Constants.PEER_HOST, "0.0.0.0");
        int i = configuration.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
        this.bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, Constants.DEFAULT_ZOOKEEPER_ROOT);
        this.zookeeperAddr = configuration.get(Constants.ZOOKEEPER_QUORUM) + ValueAggregatorDescriptor.TYPE_SEPARATOR + configuration.getInt(Constants.ZOOKEPER_CLIENT_PORT, Constants.DEFAULT_ZOOKEPER_CLIENT_PORT);
        this.peerAddress = new InetSocketAddress(str, i);
        reinitialize();
    }

    public void reinitialize() {
        try {
            LOG.debug("reinitialize(): " + getPeerName());
            this.server = RPC.getServer(this, this.peerAddress.getHostName(), this.peerAddress.getPort(), this.conf);
            this.server.start();
            LOG.info(" BSPPeer address:" + this.peerAddress.getHostName() + " port:" + this.peerAddress.getPort());
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            this.zk = new ZooKeeper(this.zookeeperAddr, 3000, this);
        } catch (IOException e2) {
            e2.printStackTrace();
        }
        Stat stat = null;
        if (this.zk != null) {
            try {
                stat = this.zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
            } catch (Exception e3) {
                LOG.error(stat);
            }
            if (stat == null) {
                try {
                    this.zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (InterruptedException e4) {
                    LOG.error(e4);
                } catch (KeeperException e5) {
                    LOG.error(e5);
                }
            }
        }
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public BSPMessage getCurrentMessage() throws IOException {
        return this.localQueue.poll();
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public void send(String str, BSPMessage bSPMessage) throws IOException {
        LOG.debug("Send bytes (" + bSPMessage.getData().toString() + ") to " + str);
        ConcurrentLinkedQueue<BSPMessage> concurrentLinkedQueue = this.outgoingQueues.get(getAddress(str));
        if (concurrentLinkedQueue == null) {
            concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        }
        concurrentLinkedQueue.add(bSPMessage);
        this.outgoingQueues.put(getAddress(str), concurrentLinkedQueue);
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public void sync() throws IOException, KeeperException, InterruptedException {
        enterBarrier();
        for (Map.Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry : this.outgoingQueues.entrySet()) {
            BSPPeerInterface bSPPeerInterface = this.peers.get(entry.getKey());
            if (bSPPeerInterface == null) {
                bSPPeerInterface = getBSPPeerConnection(entry.getKey());
            }
            Iterator<BSPMessage> it = entry.getValue().iterator();
            while (it.hasNext()) {
                bSPPeerInterface.put(it.next());
            }
        }
        waitForSync();
        Thread.sleep(100L);
        clearOutgoingQueues();
        this.currentTaskStatus.incrementSuperstepCount();
        leaveBarrier();
    }

    protected boolean enterBarrier() throws KeeperException, InterruptedException {
        LOG.debug("[" + getPeerName() + "] enter the enterbarrier");
        try {
            this.zk.create(this.bspRoot + "/" + getPeerName(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e2) {
            e2.printStackTrace();
        }
        while (true) {
            synchronized (this.mutex) {
                if (this.zk.getChildren(this.bspRoot, true).size() >= this.jobConf.getNumBspTask()) {
                    return true;
                }
                this.mutex.wait();
            }
        }
    }

    protected boolean waitForSync() throws KeeperException, InterruptedException {
        try {
            this.zk.create(this.bspRoot + "/" + getPeerName() + "-data", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e2) {
            e2.printStackTrace();
        }
        while (true) {
            synchronized (this.mutex) {
                if (this.zk.getChildren(this.bspRoot, true).size() >= this.jobConf.getNumBspTask() * 2) {
                    return true;
                }
                this.mutex.wait();
            }
        }
    }

    protected boolean leaveBarrier() throws KeeperException, InterruptedException {
        this.zk.delete(this.bspRoot + "/" + getPeerName(), 0);
        this.zk.delete(this.bspRoot + "/" + getPeerName() + "-data", 0);
        while (true) {
            synchronized (this.mutex) {
                if (this.zk.getChildren(this.bspRoot, true).size() <= 0) {
                    LOG.debug("[" + getPeerName() + "] leave from the leaveBarrier");
                    return true;
                }
                this.mutex.wait();
            }
        }
    }

    @Override // org.apache.zookeeper.Watcher
    public void process(WatchedEvent watchedEvent) {
        synchronized (this.mutex) {
            this.mutex.notify();
        }
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public void clear() {
        this.localQueue.clear();
        this.outgoingQueues.clear();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.server.stop();
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public void put(BSPMessage bSPMessage) throws IOException {
        this.localQueue.add(bSPMessage);
    }

    @Override // org.apache.hadoop.ipc.VersionedProtocol
    public long getProtocolVersion(String str, long j) throws IOException {
        return 0L;
    }

    protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress inetSocketAddress) {
        BSPPeerInterface bSPPeerInterface;
        synchronized (this.peers) {
            bSPPeerInterface = this.peers.get(inetSocketAddress);
            if (bSPPeerInterface == null) {
                try {
                    bSPPeerInterface = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class, 0L, inetSocketAddress, this.conf);
                } catch (IOException e) {
                }
                this.peers.put(inetSocketAddress, bSPPeerInterface);
            }
        }
        return bSPPeerInterface;
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public String getPeerName() {
        return this.peerAddress.getHostName() + ValueAggregatorDescriptor.TYPE_SEPARATOR + this.peerAddress.getPort();
    }

    private InetSocketAddress getAddress(String str) {
        String[] split = str.split(ValueAggregatorDescriptor.TYPE_SEPARATOR);
        return new InetSocketAddress(split[0], Integer.parseInt(split[1]));
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public String[] getAllPeerNames() {
        return (String[]) this.allPeerNames.toArray(new String[0]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setAllPeerNames(Collection<String> collection) {
        this.allPeerNames = new TreeSet(collection);
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public int getNumCurrentMessages() {
        return this.localQueue.size();
    }

    public void setCurrentTaskStatus(TaskStatus taskStatus) {
        this.currentTaskStatus = taskStatus;
    }

    @Override // org.apache.hama.bsp.BSPPeerInterface
    public long getSuperstepCount() {
        return this.currentTaskStatus.getSuperstepCount();
    }

    public void setJobConf(BSPJob bSPJob) {
        this.jobConf = bSPJob;
    }

    public int getLocalQueueSize() {
        return this.localQueue.size();
    }

    public int getOutgoingQueueSize() {
        return this.outgoingQueues.size();
    }

    public void clearLocalQueue() {
        this.localQueue.clear();
    }

    public void clearOutgoingQueues() {
        this.outgoingQueues.clear();
    }
}
