package org.apache.reef.io.network.impl;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.Tuple;
import org.apache.reef.io.network.ConnectionFactory;
import org.apache.reef.io.network.Message;
import org.apache.reef.io.network.NetworkConnectionService;
import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;

/* loaded from: input_file:org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.class */
public final class NetworkConnectionServiceImpl implements NetworkConnectionService {
    private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName());
    private final IdentifierFactory idFactory;
    private final NameResolver nameResolver;
    private final Transport transport;
    private Identifier myId;
    private final Codec<NetworkConnectionServiceMessage> nsCodec;
    private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
    private final EStage<Identifier> nameServiceUnregisteringStage;
    private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap = new ConcurrentHashMap();
    private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener = new NetworkConnectionServiceLinkListener(this.connFactoryMap);

    @Inject
    private NetworkConnectionServiceImpl(@Parameter(NetworkConnectionServiceIdFactory.class) IdentifierFactory identifierFactory, @Parameter(NetworkConnectionServicePort.class) int i, TransportFactory transportFactory, final NameResolver nameResolver) {
        this.idFactory = identifierFactory;
        this.nsCodec = new NetworkConnectionServiceMessageCodec(identifierFactory, this.connFactoryMap);
        NetworkConnectionServiceReceiveHandler networkConnectionServiceReceiveHandler = new NetworkConnectionServiceReceiveHandler(this.connFactoryMap, this.nsCodec);
        this.nameResolver = nameResolver;
        this.transport = transportFactory.newInstance(i, networkConnectionServiceReceiveHandler, networkConnectionServiceReceiveHandler, new NetworkConnectionServiceExceptionHandler());
        this.nameServiceRegisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { // from class: org.apache.reef.io.network.impl.NetworkConnectionServiceImpl.1
            public void onNext(Tuple<Identifier, InetSocketAddress> tuple) {
                try {
                    nameResolver.register((Identifier) tuple.getKey(), (InetSocketAddress) tuple.getValue());
                    NetworkConnectionServiceImpl.LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
                } catch (Exception e) {
                    String str = "Unable to register " + tuple.getKey() + " with name service";
                    NetworkConnectionServiceImpl.LOG.log(Level.WARNING, str, (Throwable) e);
                    throw new RuntimeException(str, e);
                }
            }
        }, 5);
        this.nameServiceUnregisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Identifier>() { // from class: org.apache.reef.io.network.impl.NetworkConnectionServiceImpl.2
            public void onNext(Identifier identifier) {
                try {
                    nameResolver.unregister(identifier);
                    NetworkConnectionServiceImpl.LOG.log(Level.FINEST, "Unregistered {0} with nameservice", identifier);
                } catch (Exception e) {
                    String str = "Unable to unregister " + identifier + " with name service";
                    NetworkConnectionServiceImpl.LOG.log(Level.WARNING, str, (Throwable) e);
                    throw new RuntimeException(str, e);
                }
            }
        }, 5);
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public <T> void registerConnectionFactory(Identifier identifier, Codec<T> codec, EventHandler<Message<T>> eventHandler, LinkListener<Message<T>> linkListener) throws NetworkException {
        String identifier2 = identifier.toString();
        if (this.connFactoryMap.get(identifier2) != null) {
            throw new NetworkException("ConnectionFactory " + identifier + " was already registered.");
        }
        if (this.connFactoryMap.putIfAbsent(identifier2, new NetworkConnectionFactory(this, identifier2, codec, eventHandler, linkListener)) != null) {
            throw new NetworkException("ConnectionFactory " + identifier + " was already registered.");
        }
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public void unregisterConnectionFactory(Identifier identifier) {
        String identifier2 = identifier.toString();
        if (this.connFactoryMap.get(identifier2) == null) {
            LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", identifier2);
        } else if (this.connFactoryMap.remove(identifier2) == null) {
            LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", identifier2);
        }
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public void registerId(Identifier identifier) {
        LOG.log(Level.INFO, "Registering NetworkConnectionService " + identifier);
        this.myId = identifier;
        Tuple tuple = new Tuple(identifier, (InetSocketAddress) this.transport.getLocalAddress());
        LOG.log(Level.FINEST, "Binding {0} to NetworkConnectionService@({1})", new Object[]{tuple.getKey(), tuple.getValue()});
        this.nameServiceRegisteringStage.onNext(tuple);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Link<NetworkConnectionServiceMessage<T>> openLink(Identifier identifier) throws NetworkException {
        try {
            InetSocketAddress lookup = this.nameResolver.lookup(identifier);
            if (lookup == null) {
                throw new NetworkException("Lookup " + identifier + " is null");
            }
            return this.transport.open(lookup, this.nsCodec, this.nsLinkListener);
        } catch (Exception e) {
            e.printStackTrace();
            throw new NetworkException(e);
        }
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public <T> ConnectionFactory<T> getConnectionFactory(Identifier identifier) {
        NetworkConnectionFactory networkConnectionFactory = this.connFactoryMap.get(identifier.toString());
        if (networkConnectionFactory == null) {
            throw new RuntimeException("Cannot find ConnectionFactory of " + identifier + ".");
        }
        return networkConnectionFactory;
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public void unregisterId(Identifier identifier) {
        LOG.log(Level.FINEST, "Unbinding {0} to NetworkConnectionService@({1})", new Object[]{identifier, this.transport.getLocalAddress()});
        this.myId = null;
        this.nameServiceUnregisteringStage.onNext(identifier);
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public Identifier getNetworkConnectionServiceId() {
        return this.myId;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.log(Level.FINE, "Shutting down");
        this.nameServiceRegisteringStage.close();
        this.nameServiceUnregisteringStage.close();
        this.transport.close();
    }
}
