package org.apache.reef.io.network.impl;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.io.Tuple;
import org.apache.reef.io.naming.Naming;
import org.apache.reef.io.network.Connection;
import org.apache.reef.io.network.ConnectionFactory;
import org.apache.reef.io.network.Message;
import org.apache.reef.io.network.impl.NetworkServiceParameters;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.Stage;
import org.apache.reef.wake.impl.LoggingEventHandler;
import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;
import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;

/* loaded from: input_file:org/apache/reef/io/network/impl/NetworkService.class */
public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
    private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
    private final IdentifierFactory factory;
    private final Codec<T> codec;
    private final Transport transport;
    private final NameResolver nameResolver;
    private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap();
    private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
    private final EStage<Identifier> nameServiceUnregisteringStage;
    private Identifier myId;

    @Inject
    @Deprecated
    public NetworkService(@Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory identifierFactory, @Parameter(NetworkServiceParameters.NetworkServicePort.class) int i, final NameResolver nameResolver, @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec, @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory transportFactory, @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> eventHandler, @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> eventHandler2, LocalAddressProvider localAddressProvider) {
        this.factory = identifierFactory;
        this.codec = codec;
        this.transport = transportFactory.newInstance(i, new LoggingEventHandler(), new MessageHandler(eventHandler, codec, identifierFactory), eventHandler2);
        this.nameResolver = nameResolver;
        this.nameServiceRegisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { // from class: org.apache.reef.io.network.impl.NetworkService.1
            @Override // org.apache.reef.wake.EventHandler
            public void onNext(Tuple<Identifier, InetSocketAddress> tuple) {
                try {
                    nameResolver.register(tuple.getKey(), tuple.getValue());
                    NetworkService.LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
                } catch (Exception e) {
                    String str = "Unable to register " + tuple.getKey() + "with name service";
                    NetworkService.LOG.log(Level.WARNING, str, (Throwable) e);
                    throw new RuntimeException(str, e);
                }
            }
        }, 5);
        this.nameServiceUnregisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Identifier>() { // from class: org.apache.reef.io.network.impl.NetworkService.2
            @Override // org.apache.reef.wake.EventHandler
            public void onNext(Identifier identifier) {
                try {
                    nameResolver.unregister(identifier);
                    NetworkService.LOG.log(Level.FINEST, "Unregistered {0} with nameservice", identifier);
                } catch (Exception e) {
                    String str = "Unable to unregister " + identifier + " with name service";
                    NetworkService.LOG.log(Level.WARNING, str, (Throwable) e);
                    throw new RuntimeException(str, e);
                }
            }
        }, 5);
    }

    public void registerId(Identifier identifier) {
        this.myId = identifier;
        Tuple<Identifier, InetSocketAddress> tuple = new Tuple<>(identifier, (InetSocketAddress) this.transport.getLocalAddress());
        LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})", new Object[]{tuple.getKey(), tuple.getValue()});
        this.nameServiceRegisteringStage.onNext(tuple);
    }

    public void unregisterId(Identifier identifier) {
        this.myId = null;
        LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})", new Object[]{identifier, this.transport.getLocalAddress()});
        this.nameServiceUnregisteringStage.onNext(identifier);
    }

    public Identifier getMyId() {
        return this.myId;
    }

    public Transport getTransport() {
        return this.transport;
    }

    public Codec<T> getCodec() {
        return this.codec;
    }

    public Naming getNameClient() {
        return this.nameResolver;
    }

    public IdentifierFactory getIdentifierFactory() {
        return this.factory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(Identifier identifier) {
        this.idToConnMap.remove(identifier);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.log(Level.FINE, "Shutting down");
        this.transport.close();
        this.nameResolver.close();
    }

    @Override // org.apache.reef.io.network.ConnectionFactory
    public Connection<T> newConnection(Identifier identifier) {
        if (this.myId == null) {
            throw new RuntimeException("Trying to establish a connection from a Network Service that is not bound to any task");
        }
        Connection<T> connection = this.idToConnMap.get(identifier);
        if (connection != null) {
            return connection;
        }
        NSConnection nSConnection = new NSConnection(this.myId, identifier, new LoggingLinkListener(), this);
        Connection<T> putIfAbsent = this.idToConnMap.putIfAbsent(identifier, nSConnection);
        return putIfAbsent == null ? nSConnection : putIfAbsent;
    }

    @Override // org.apache.reef.io.network.ConnectionFactory
    public Identifier getConnectionFactoryId() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.reef.io.network.ConnectionFactory
    public Identifier getLocalEndPointId() {
        throw new UnsupportedOperationException();
    }
}
