package org.apache.reef.io.network.impl;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.Connection;
import org.apache.reef.io.network.naming.exception.NamingException;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/reef/io/network/impl/NSConnection.class */
public class NSConnection<T> implements Connection<T> {
    private static final Logger LOG = Logger.getLogger(NSConnection.class.getName());
    private final Identifier srcId;
    private final Identifier destId;
    private final LinkListener<NSMessage<T>> listener;
    private final NetworkService<T> service;
    private final Codec<NSMessage<T>> codec;
    private Link<NSMessage<T>> link;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NSConnection(Identifier identifier, Identifier identifier2, LinkListener<T> linkListener, NetworkService<T> networkService) {
        this.srcId = identifier;
        this.destId = identifier2;
        this.listener = new NSMessageLinkListener(linkListener);
        this.service = networkService;
        this.codec = new NSMessageCodec(networkService.getCodec(), networkService.getIdentifierFactory());
    }

    @Override // org.apache.reef.io.network.Connection
    public void open() throws NetworkException {
        LOG.log(Level.FINE, "looking up {0}", this.destId);
        try {
            InetSocketAddress lookup = this.service.getNameClient().lookup(this.destId);
            if (lookup == null) {
                NetworkException namingException = new NamingException("Cannot resolve " + this.destId);
                LOG.log(Level.WARNING, "Cannot resolve " + this.destId, (Throwable) namingException);
                throw namingException;
            }
            LOG.log(Level.FINE, "Resolved {0} to {1}", new Object[]{this.destId, lookup});
            this.link = this.service.getTransport().open(lookup, this.codec, this.listener);
            LOG.log(Level.FINE, "Transport returned a link {0}", this.link);
        } catch (Exception e) {
            LOG.log(Level.WARNING, "Could not open " + this.destId, (Throwable) e);
            throw new NetworkException(e);
        }
    }

    @Override // org.apache.reef.io.network.Connection
    public void write(T t) {
        this.link.write(new NSMessage(this.srcId, this.destId, t));
    }

    @Override // org.apache.reef.io.network.Connection
    public void write(List<T> list) {
        this.link.write(new NSMessage(this.srcId, this.destId, (List) list));
    }

    @Override // org.apache.reef.io.network.Connection, java.lang.AutoCloseable
    public void close() throws NetworkException {
        this.service.remove(this.destId);
    }
}
