package org.apache.reef.io.network.impl;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.Tuple;
import org.apache.reef.io.network.ConnectionFactory;
import org.apache.reef.io.network.Message;
import org.apache.reef.io.network.NetworkConnectionService;
import org.apache.reef.io.network.exception.NetworkRuntimeException;
import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;

/* loaded from: input_file:org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.class */
public final class NetworkConnectionServiceImpl implements NetworkConnectionService {
    private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName());
    private final IdentifierFactory idFactory;
    private final NameResolver nameResolver;
    private final Transport transport;
    private final Codec<NetworkConnectionServiceMessage> nsCodec;
    private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
    private final EStage<Identifier> nameServiceUnregisteringStage;
    private final AtomicBoolean isClosed;
    private static final String DELIMITER = "/";
    private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap = new ConcurrentHashMap();
    private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener = new NetworkConnectionServiceLinkListener(this.connFactoryMap);

    @Inject
    private NetworkConnectionServiceImpl(@Parameter(NetworkConnectionServiceIdFactory.class) IdentifierFactory identifierFactory, @Parameter(NetworkConnectionServicePort.class) int i, TransportFactory transportFactory, final NameResolver nameResolver) {
        this.idFactory = identifierFactory;
        this.nsCodec = new NetworkConnectionServiceMessageCodec(identifierFactory, this.connFactoryMap);
        NetworkConnectionServiceReceiveHandler networkConnectionServiceReceiveHandler = new NetworkConnectionServiceReceiveHandler(this.connFactoryMap, this.nsCodec);
        this.nameResolver = nameResolver;
        this.transport = transportFactory.newInstance(i, networkConnectionServiceReceiveHandler, networkConnectionServiceReceiveHandler, new NetworkConnectionServiceExceptionHandler());
        this.nameServiceRegisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { // from class: org.apache.reef.io.network.impl.NetworkConnectionServiceImpl.1
            @Override // org.apache.reef.wake.EventHandler
            public void onNext(Tuple<Identifier, InetSocketAddress> tuple) {
                try {
                    nameResolver.register(tuple.getKey(), tuple.getValue());
                    NetworkConnectionServiceImpl.LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
                } catch (Exception e) {
                    String str = "Unable to register " + tuple.getKey() + " with name service";
                    NetworkConnectionServiceImpl.LOG.log(Level.WARNING, str, (Throwable) e);
                    throw new RuntimeException(str, e);
                }
            }
        }, 5);
        this.nameServiceUnregisteringStage = new SingleThreadStage("NameServiceRegisterer", new EventHandler<Identifier>() { // from class: org.apache.reef.io.network.impl.NetworkConnectionServiceImpl.2
            @Override // org.apache.reef.wake.EventHandler
            public void onNext(Identifier identifier) {
                try {
                    nameResolver.unregister(identifier);
                    NetworkConnectionServiceImpl.LOG.log(Level.FINEST, "Unregistered {0} with nameservice", identifier);
                } catch (Exception e) {
                    String str = "Unable to unregister " + identifier + " with name service";
                    NetworkConnectionServiceImpl.LOG.log(Level.WARNING, str, (Throwable) e);
                    throw new RuntimeException(str, e);
                }
            }
        }, 5);
        this.isClosed = new AtomicBoolean();
    }

    private void checkBeforeRegistration(String str) {
        if (this.isClosed.get()) {
            throw new NetworkRuntimeException("Unable to register new ConnectionFactory to closed NetworkConnectionService");
        }
        if (this.connFactoryMap.get(str) != null) {
            throw new NetworkRuntimeException("ConnectionFactory " + str + " was already registered.");
        }
        if (str.contains("/")) {
            throw new NetworkRuntimeException("The ConnectionFactoryId " + str + " should not contain /");
        }
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public <T> ConnectionFactory<T> registerConnectionFactory(Identifier identifier, Codec<T> codec, EventHandler<Message<T>> eventHandler, LinkListener<Message<T>> linkListener, Identifier identifier2) {
        String identifier3 = identifier.toString();
        checkBeforeRegistration(identifier3);
        NetworkConnectionFactory networkConnectionFactory = new NetworkConnectionFactory(this, identifier, codec, eventHandler, linkListener, identifier2);
        this.nameServiceRegisteringStage.onNext(new Tuple<>(getEndPointIdWithConnectionFactoryId(identifier, identifier2), (InetSocketAddress) this.transport.getLocalAddress()));
        if (this.connFactoryMap.putIfAbsent(identifier3, networkConnectionFactory) != null) {
            throw new NetworkRuntimeException("ConnectionFactory " + identifier + " was already registered.");
        }
        LOG.log(Level.INFO, "ConnectionFactory {0} was registered", identifier3);
        return networkConnectionFactory;
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public void unregisterConnectionFactory(Identifier identifier) {
        String identifier2 = identifier.toString();
        NetworkConnectionFactory remove = this.connFactoryMap.remove(identifier2);
        if (remove == null) {
            LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", identifier2);
            return;
        }
        LOG.log(Level.INFO, "ConnectionFactory {0} was unregistered", identifier2);
        this.nameServiceUnregisteringStage.onNext(getEndPointIdWithConnectionFactoryId(identifier, remove.getLocalEndPointId()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Link<NetworkConnectionServiceMessage<T>> openLink(Identifier identifier, Identifier identifier2) throws NetworkException {
        Identifier endPointIdWithConnectionFactoryId = getEndPointIdWithConnectionFactoryId(identifier, identifier2);
        try {
            InetSocketAddress lookup = this.nameResolver.lookup(endPointIdWithConnectionFactoryId);
            if (lookup == null) {
                throw new NetworkException("Lookup " + endPointIdWithConnectionFactoryId + " is null");
            }
            return this.transport.open(lookup, this.nsCodec, this.nsLinkListener);
        } catch (Exception e) {
            throw new NetworkException(e);
        }
    }

    private Identifier getEndPointIdWithConnectionFactoryId(Identifier identifier, Identifier identifier2) {
        return this.idFactory.getNewInstance(identifier.toString() + "/" + identifier2.toString());
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService
    public <T> ConnectionFactory<T> getConnectionFactory(Identifier identifier) {
        NetworkConnectionFactory networkConnectionFactory = this.connFactoryMap.get(identifier.toString());
        if (networkConnectionFactory == null) {
            throw new RuntimeException("Cannot find ConnectionFactory of " + identifier + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER);
        }
        return networkConnectionFactory;
    }

    @Override // org.apache.reef.io.network.NetworkConnectionService, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.isClosed.compareAndSet(false, true)) {
            LOG.log(Level.FINE, "Shutting down");
            this.nameServiceRegisteringStage.close();
            this.nameServiceUnregisteringStage.close();
            this.nameResolver.close();
            this.transport.close();
        }
    }
}
