package org.apache.nemo.runtime.executor.bytetransfer;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.Iterator;
import javax.inject.Inject;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.NettyChannelImplementationSelector;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteTransport.class */
public final class ByteTransport implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ByteTransport.class);
    private static final String SERVER_LISTENING = "byte:server:listening";
    private static final String SERVER_WORKING = "byte:server:working";
    private static final String CLIENT = "byte:client";
    private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private final NameResolver nameResolver;
    private final EventLoopGroup serverListeningGroup;
    private final EventLoopGroup serverWorkingGroup;
    private final EventLoopGroup clientGroup;
    private final Bootstrap clientBootstrap;
    private final Channel serverListeningChannel;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteTransport$ByteTransportIdentifier.class */
    public static final class ByteTransportIdentifier implements Identifier {
        private final String executorId;

        private ByteTransportIdentifier(String str) {
            this.executorId = str;
        }

        public String toString() {
            return "byte://" + this.executorId;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.executorId.equals(((ByteTransportIdentifier) obj).executorId);
        }

        public int hashCode() {
            return this.executorId.hashCode();
        }
    }

    @Inject
    private ByteTransport(NameResolver nameResolver, @Parameter(JobConf.ExecutorId.class) String str, NettyChannelImplementationSelector nettyChannelImplementationSelector, ByteTransportChannelInitializer byteTransportChannelInitializer, TcpPortProvider tcpPortProvider, LocalAddressProvider localAddressProvider, @Parameter(JobConf.PartitionTransportServerPort.class) int i, @Parameter(JobConf.PartitionTransportServerBacklog.class) int i2, @Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) int i3, @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) int i4, @Parameter(JobConf.PartitionTransportClientNumThreads.class) int i5) {
        this.nameResolver = nameResolver;
        if (i < 0) {
            throw new IllegalArgumentException(String.format("Invalid ByteTransportPort: %d", Integer.valueOf(i)));
        }
        String localAddress = localAddressProvider.getLocalAddress();
        this.serverListeningGroup = nettyChannelImplementationSelector.newEventLoopGroup(i3, new DefaultThreadFactory(SERVER_LISTENING));
        this.serverWorkingGroup = nettyChannelImplementationSelector.newEventLoopGroup(i4, new DefaultThreadFactory(SERVER_WORKING));
        this.clientGroup = nettyChannelImplementationSelector.newEventLoopGroup(i5, new DefaultThreadFactory(CLIENT));
        this.clientBootstrap = new Bootstrap().group(this.clientGroup).channel(nettyChannelImplementationSelector.getChannelClass()).handler(byteTransportChannelInitializer).option(ChannelOption.SO_REUSEADDR, true);
        ServerBootstrap option = new ServerBootstrap().group(this.serverListeningGroup, this.serverWorkingGroup).channel(nettyChannelImplementationSelector.getServerChannelClass()).childHandler(byteTransportChannelInitializer).option(ChannelOption.SO_BACKLOG, Integer.valueOf(i2)).option(ChannelOption.SO_REUSEADDR, true);
        Channel channel = null;
        if (i == 0) {
            Iterator it = tcpPortProvider.iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                try {
                    ChannelFuture await = option.bind(localAddress, intValue).await();
                    if (await.cause() == null) {
                        if (await.isSuccess()) {
                            channel = await.channel();
                            break;
                        }
                        LOG.debug("Cannot bind to {}:{}", localAddress, Integer.valueOf(intValue));
                    } else {
                        LOG.debug(String.format("Cannot bind to %s:%d", localAddress, Integer.valueOf(intValue)), await.cause());
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.debug(String.format("Interrupted while binding to %s:%d", localAddress, Integer.valueOf(intValue)), e);
                }
            }
            if (channel == null) {
                this.serverListeningGroup.shutdownGracefully();
                this.serverWorkingGroup.shutdownGracefully();
                this.clientGroup.shutdownGracefully();
                LOG.error("Cannot bind to {} with tcpPortProvider", localAddress);
                throw new RuntimeException(String.format("Cannot bind to %s with tcpPortProvider", localAddress));
            }
        } else {
            try {
                ChannelFuture await2 = option.bind(localAddress, i).await();
                if (await2.cause() != null) {
                    throw await2.cause();
                }
                if (!await2.isSuccess()) {
                    throw new RuntimeException("Cannot bind");
                }
                channel = await2.channel();
            } catch (Throwable th) {
                this.serverListeningGroup.shutdownGracefully();
                this.serverWorkingGroup.shutdownGracefully();
                this.clientGroup.shutdownGracefully();
                LOG.error(String.format("Cannot bind to %s:%d", localAddress, Integer.valueOf(i)), th);
                throw new RuntimeException(String.format("Cannot bind to %s:%d", localAddress, Integer.valueOf(i)), th);
            }
        }
        this.serverListeningChannel = channel;
        try {
            nameResolver.register(new ByteTransportIdentifier(str), (InetSocketAddress) channel.localAddress());
            LOG.info("ByteTransport server in {} is listening at {}", str, channel.localAddress());
        } catch (Exception e2) {
            LOG.error("Cannot register ByteTransport listening address to the naming registry", e2);
            throw new RuntimeException(e2);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LOG.info("Stopping listening at {} and closing", this.serverListeningChannel.localAddress());
        ChannelFuture close = this.serverListeningChannel.close();
        ChannelGroupFuture close2 = this.channelGroup.close();
        Future shutdownGracefully = this.serverListeningGroup.shutdownGracefully();
        Future shutdownGracefully2 = this.serverWorkingGroup.shutdownGracefully();
        Future shutdownGracefully3 = this.clientGroup.shutdownGracefully();
        close.awaitUninterruptibly();
        close2.awaitUninterruptibly();
        shutdownGracefully.awaitUninterruptibly();
        shutdownGracefully2.awaitUninterruptibly();
        shutdownGracefully3.awaitUninterruptibly();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelFuture connectTo(String str) {
        try {
            ChannelFuture connect = this.clientBootstrap.connect(this.nameResolver.lookup(new ByteTransportIdentifier(str)));
            connect.addListener(future -> {
                if (future.isSuccess()) {
                    LOG.debug("Connected to {}", str);
                } else {
                    LOG.error("Failed to connect to {}", str);
                }
            });
            return connect;
        } catch (Exception e) {
            LOG.error(String.format("Cannot lookup ByteTransport listening address of %s", str), e);
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelGroup getChannelGroup() {
        return this.channelGroup;
    }
}
