package org.apache.giraph.comm.netty;

import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
import org.apache.giraph.comm.netty.handler.ClientRequestId;
import org.apache.giraph.comm.netty.handler.RequestEncoder;
import org.apache.giraph.comm.netty.handler.RequestInfo;
import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
import org.apache.giraph.comm.netty.handler.SaslClientHandler;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.utils.PipelineUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.codehaus.jackson.util.MinimalPrettyPrinter;

/* loaded from: input_file:org/apache/giraph/comm/netty/NettyClient.class */
public class NettyClient {
    public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS = "giraph.waitForRequestsConfirmation";
    public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;
    public static final String MAX_NUMBER_OF_OPEN_REQUESTS = "giraph.maxNumberOfOpenRequests";
    public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
    public static final int MAX_REQUESTS_TO_LIST = 10;
    public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
    public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30000;
    public static final AttributeKey<SaslNettyClient> SASL = AttributeKey.valueOf("saslNettyClient");
    private static final Logger LOG = Logger.getLogger(NettyClient.class);
    private final Mapper<?, ?, ?, ?>.Context context;
    private final Bootstrap bootstrap;
    private final ConcurrentMap<ClientRequestId, RequestInfo> clientRequestIdRequestInfoMap;
    private final int channelsPerServer;
    private final int sendBufferSize;
    private final int receiveBufferSize;
    private final boolean limitNumberOfOpenRequests;
    private final int maxNumberOfOpenRequests;
    private final int maxConnectionFailures;
    private final int maxRequestMilliseconds;
    private final int waitingRequestMsecs;
    private final EventLoopGroup workerGroup;
    private final TaskInfo myTaskInfo;
    private final int maxPoolSize;
    private final int maxResolveAddressAttempts;
    private final boolean useExecutionGroup;
    private final EventExecutorGroup executionGroup;
    private final String handlerToUseExecutionGroup;
    private final ConcurrentMap<InetSocketAddress, ChannelRotater> addressChannelMap = new MapMaker().makeMap();
    private final Map<Integer, InetSocketAddress> taskIdAddressMap = new MapMaker().makeMap();
    private final InboundByteCounter inboundByteCounter = new InboundByteCounter();
    private final OutboundByteCounter outboundByteCounter = new OutboundByteCounter();
    private final TimedLogger requestLogger = new TimedLogger(15000, LOG);
    private final AddressRequestIdGenerator addressRequestIdGenerator = new AddressRequestIdGenerator();
    private final AtomicLong lastTimeCheckedRequestsForProblems = new AtomicLong(0);
    private final LogOnErrorChannelFutureListener logErrorListener = new LogOnErrorChannelFutureListener();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/comm/netty/NettyClient$ChannelFutureAddress.class */
    public static class ChannelFutureAddress {
        private final ChannelFuture future;
        private final InetSocketAddress address;
        private final Integer taskId;

        ChannelFutureAddress(ChannelFuture channelFuture, InetSocketAddress inetSocketAddress, Integer num) {
            this.future = channelFuture;
            this.address = inetSocketAddress;
            this.taskId = num;
        }

        public String toString() {
            return "(future=" + this.future + ",address=" + this.address + ",taskId=" + this.taskId + DefaultExpressionEngine.DEFAULT_INDEX_END;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/comm/netty/NettyClient$LogOnErrorChannelFutureListener.class */
    public static class LogOnErrorChannelFutureListener implements ChannelFutureListener {
        private LogOnErrorChannelFutureListener() {
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (!channelFuture.isDone() || channelFuture.isSuccess()) {
                return;
            }
            NettyClient.LOG.error("Request failed", channelFuture.cause());
        }
    }

    public NettyClient(Mapper<?, ?, ?, ?>.Context context, final ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, TaskInfo taskInfo, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.context = context;
        this.myTaskInfo = taskInfo;
        this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(immutableClassesGiraphConfiguration);
        this.sendBufferSize = GiraphConstants.CLIENT_SEND_BUFFER_SIZE.get(immutableClassesGiraphConfiguration);
        this.receiveBufferSize = GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE.get(immutableClassesGiraphConfiguration);
        this.limitNumberOfOpenRequests = immutableClassesGiraphConfiguration.getBoolean(LIMIT_NUMBER_OF_OPEN_REQUESTS, false);
        if (this.limitNumberOfOpenRequests) {
            this.maxNumberOfOpenRequests = immutableClassesGiraphConfiguration.getInt(MAX_NUMBER_OF_OPEN_REQUESTS, 10000);
            if (LOG.isInfoEnabled()) {
                LOG.info("NettyClient: Limit number of open requests to " + this.maxNumberOfOpenRequests);
            }
        } else {
            this.maxNumberOfOpenRequests = -1;
        }
        this.maxRequestMilliseconds = GiraphConstants.MAX_REQUEST_MILLISECONDS.get(immutableClassesGiraphConfiguration);
        this.maxConnectionFailures = GiraphConstants.NETTY_MAX_CONNECTION_FAILURES.get(immutableClassesGiraphConfiguration);
        this.waitingRequestMsecs = GiraphConstants.WAITING_REQUEST_MSECS.get(immutableClassesGiraphConfiguration);
        this.maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(immutableClassesGiraphConfiguration);
        this.maxResolveAddressAttempts = GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS.get(immutableClassesGiraphConfiguration);
        this.clientRequestIdRequestInfoMap = new MapMaker().concurrencyLevel2(this.maxPoolSize).makeMap();
        this.handlerToUseExecutionGroup = GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(immutableClassesGiraphConfiguration);
        this.useExecutionGroup = GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.get(immutableClassesGiraphConfiguration);
        if (this.useExecutionGroup) {
            int i = GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS.get(immutableClassesGiraphConfiguration);
            this.executionGroup = new DefaultEventExecutorGroup(i, ThreadUtils.createThreadFactory("netty-client-exec-%d", uncaughtExceptionHandler));
            if (LOG.isInfoEnabled()) {
                LOG.info("NettyClient: Using execution handler with " + i + " threads after " + this.handlerToUseExecutionGroup + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER);
            }
        } else {
            this.executionGroup = null;
        }
        this.workerGroup = new NioEventLoopGroup(this.maxPoolSize, ThreadUtils.createThreadFactory("netty-client-worker-%d", uncaughtExceptionHandler));
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.sendBufferSize)).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.receiveBufferSize)).option(ChannelOption.ALLOCATOR, immutableClassesGiraphConfiguration.getNettyAllocator()).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.giraph.comm.netty.NettyClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                if (!immutableClassesGiraphConfiguration.authenticate()) {
                    NettyClient.LOG.info("Using Netty without authentication.");
                    PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter", NettyClient.this.inboundByteCounter, NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    if (immutableClassesGiraphConfiguration.doCompression()) {
                        PipelineUtils.addLastWithExecutorCheck("compressionDecoder", immutableClassesGiraphConfiguration.getNettyCompressionDecoder(), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    }
                    PipelineUtils.addLastWithExecutorCheck("clientOutboundByteCounter", NettyClient.this.outboundByteCounter, NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    if (immutableClassesGiraphConfiguration.doCompression()) {
                        PipelineUtils.addLastWithExecutorCheck("compressionEncoder", immutableClassesGiraphConfiguration.getNettyCompressionEncoder(), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    }
                    PipelineUtils.addLastWithExecutorCheck("fixed-length-frame-decoder", new FixedLengthFrameDecoder(13), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    PipelineUtils.addLastWithExecutorCheck("request-encoder", new RequestEncoder(immutableClassesGiraphConfiguration), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    PipelineUtils.addLastWithExecutorCheck("response-handler", new ResponseClientHandler(NettyClient.this.clientRequestIdRequestInfoMap, immutableClassesGiraphConfiguration), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                    return;
                }
                NettyClient.LOG.info("Using Netty with authentication.");
                PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter", NettyClient.this.inboundByteCounter, NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                if (immutableClassesGiraphConfiguration.doCompression()) {
                    PipelineUtils.addLastWithExecutorCheck("compressionDecoder", immutableClassesGiraphConfiguration.getNettyCompressionDecoder(), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                }
                PipelineUtils.addLastWithExecutorCheck("clientOutboundByteCounter", NettyClient.this.outboundByteCounter, NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                if (immutableClassesGiraphConfiguration.doCompression()) {
                    PipelineUtils.addLastWithExecutorCheck("compressionEncoder", immutableClassesGiraphConfiguration.getNettyCompressionEncoder(), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                }
                PipelineUtils.addLastWithExecutorCheck("length-field-based-frame-decoder", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                PipelineUtils.addLastWithExecutorCheck("request-encoder", new RequestEncoder(immutableClassesGiraphConfiguration), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                PipelineUtils.addLastWithExecutorCheck("sasl-client-handler", new SaslClientHandler(immutableClassesGiraphConfiguration), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
                PipelineUtils.addLastWithExecutorCheck("response-handler", new ResponseClientHandler(NettyClient.this.clientRequestIdRequestInfoMap, immutableClassesGiraphConfiguration), NettyClient.this.handlerToUseExecutionGroup, NettyClient.this.executionGroup, socketChannel);
            }
        });
    }

    public void connectAllAddresses(Collection<? extends TaskInfo> collection) {
        ArrayList<ChannelFutureAddress> newArrayListWithCapacity = Lists.newArrayListWithCapacity(collection.size() * this.channelsPerServer);
        for (TaskInfo taskInfo : collection) {
            this.context.progress();
            InetSocketAddress inetSocketAddress = this.taskIdAddressMap.get(Integer.valueOf(taskInfo.getTaskId()));
            if (inetSocketAddress == null || !inetSocketAddress.getHostName().equals(taskInfo.getHostname()) || inetSocketAddress.getPort() != taskInfo.getPort()) {
                inetSocketAddress = resolveAddress(this.maxResolveAddressAttempts, taskInfo.getInetSocketAddress());
                this.taskIdAddressMap.put(Integer.valueOf(taskInfo.getTaskId()), inetSocketAddress);
            }
            if (inetSocketAddress == null || inetSocketAddress.getHostName() == null || inetSocketAddress.getHostName().isEmpty()) {
                throw new IllegalStateException("connectAllAddresses: Null address in addresses " + collection);
            }
            if (inetSocketAddress.isUnresolved()) {
                throw new IllegalStateException("connectAllAddresses: Unresolved address " + inetSocketAddress);
            }
            if (!this.addressChannelMap.containsKey(inetSocketAddress)) {
                for (int i = 0; i < this.channelsPerServer; i++) {
                    newArrayListWithCapacity.add(new ChannelFutureAddress(this.bootstrap.connect(inetSocketAddress), inetSocketAddress, Integer.valueOf(taskInfo.getTaskId())));
                }
            }
        }
        int i2 = 0;
        int i3 = 0;
        while (i2 < this.maxConnectionFailures) {
            ArrayList newArrayList = Lists.newArrayList();
            for (ChannelFutureAddress channelFutureAddress : newArrayListWithCapacity) {
                this.context.progress();
                ChannelFuture channelFuture = channelFutureAddress.future;
                ProgressableUtils.awaitChannelFuture(channelFuture, this.context);
                if (channelFuture.isSuccess()) {
                    Channel channel = channelFuture.channel();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("connectAllAddresses: Connected to " + channel.remoteAddress() + ", open = " + channel.isOpen());
                    }
                    if (channel.remoteAddress() == null) {
                        throw new IllegalStateException("connectAllAddresses: Null remote address!");
                    }
                    ChannelRotater channelRotater = this.addressChannelMap.get(channelFutureAddress.address);
                    if (channelRotater == null) {
                        ChannelRotater channelRotater2 = new ChannelRotater(channelFutureAddress.taskId);
                        channelRotater = this.addressChannelMap.putIfAbsent(channelFutureAddress.address, channelRotater2);
                        if (channelRotater == null) {
                            channelRotater = channelRotater2;
                        }
                    }
                    channelRotater.addChannel(channelFuture.channel());
                    i3++;
                } else {
                    LOG.warn("connectAllAddresses: Future failed to connect with " + channelFutureAddress.address + " with " + i2 + " failures because of " + channelFuture.cause());
                    newArrayList.add(new ChannelFutureAddress(this.bootstrap.connect(channelFutureAddress.address), channelFutureAddress.address, channelFutureAddress.taskId));
                    i2++;
                }
            }
            LOG.info("connectAllAddresses: Successfully added " + (newArrayListWithCapacity.size() - newArrayList.size()) + " connections, (" + i3 + " total connected) " + newArrayList.size() + " failed, " + i2 + " failures total.");
            if (newArrayList.isEmpty()) {
                break;
            } else {
                newArrayListWithCapacity = newArrayList;
            }
        }
        if (i2 >= this.maxConnectionFailures) {
            throw new IllegalStateException("connectAllAddresses: Too many failures (" + i2 + ").");
        }
    }

    public void authenticate() {
        LOG.info("authenticate: NettyClient starting authentication with servers.");
        for (InetSocketAddress inetSocketAddress : this.addressChannelMap.keySet()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("authenticate: Authenticating with address:" + inetSocketAddress);
            }
            ChannelRotater channelRotater = this.addressChannelMap.get(inetSocketAddress);
            for (Channel channel : channelRotater.getChannels()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("authenticate: Authenticating with server on channel: " + channel);
                }
                authenticateOnChannel(channelRotater.getTaskId(), channel);
            }
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("authenticate: NettyClient successfully authenticated with " + this.addressChannelMap.size() + " server" + (this.addressChannelMap.size() != 1 ? "s" : "") + " - continuing with normal work.");
        }
    }

    private void authenticateOnChannel(Integer num, Channel channel) {
        try {
            SaslNettyClient saslNettyClient = (SaslNettyClient) channel.attr(SASL).get();
            if (channel.attr(SASL).get() == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("authenticateOnChannel: Creating saslNettyClient now for channel: " + channel);
                }
                saslNettyClient = new SaslNettyClient();
                channel.attr(SASL).set(saslNettyClient);
            }
            if (!saslNettyClient.isComplete()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("authenticateOnChannel: Waiting for authentication to complete..");
                }
                sendWritableRequest(num, saslNettyClient.firstToken());
                try {
                    synchronized (saslNettyClient.getAuthenticated()) {
                        while (!saslNettyClient.isComplete()) {
                            saslNettyClient.getAuthenticated().wait();
                        }
                    }
                } catch (InterruptedException e) {
                    LOG.error("authenticateOnChannel: Interrupted while waiting for authentication.");
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("authenticateOnChannel: Authentication on channel: " + channel + " has completed successfully.");
            }
        } catch (IOException e2) {
            LOG.error("authenticateOnChannel: Failed to authenticate with server due to error: " + e2);
        }
    }

    public void stop() {
        if (LOG.isInfoEnabled()) {
            LOG.info("stop: Halting netty client");
        }
        int i = 0;
        Iterator<ChannelRotater> it2 = this.addressChannelMap.values().iterator();
        while (it2.hasNext()) {
            i += it2.next().size();
        }
        final int i2 = i;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Iterator<ChannelRotater> it3 = this.addressChannelMap.values().iterator();
        while (it3.hasNext()) {
            it3.next().closeChannels(new ChannelFutureListener() { // from class: org.apache.giraph.comm.netty.NettyClient.2
                @Override // io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(ChannelFuture channelFuture) {
                    NettyClient.this.context.progress();
                    if (atomicInteger.incrementAndGet() == i2) {
                        if (NettyClient.LOG.isInfoEnabled()) {
                            NettyClient.LOG.info("stop: reached wait threshold, " + i2 + " connections closed, releasing resources now.");
                        }
                        NettyClient.this.workerGroup.shutdownGracefully();
                        if (NettyClient.this.executionGroup != null) {
                            NettyClient.this.executionGroup.shutdownGracefully();
                        }
                    }
                }
            });
        }
        ProgressableUtils.awaitTerminationFuture(this.workerGroup, this.context);
        if (this.executionGroup != null) {
            ProgressableUtils.awaitTerminationFuture(this.executionGroup, this.context);
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("stop: Netty client halted");
        }
    }

    private Channel getNextChannel(InetSocketAddress inetSocketAddress) {
        Channel nextChannel = this.addressChannelMap.get(inetSocketAddress).nextChannel();
        if (nextChannel == null) {
            throw new IllegalStateException("getNextChannel: No channel exists for " + inetSocketAddress);
        }
        if (nextChannel.isActive()) {
            return nextChannel;
        }
        if (this.addressChannelMap.get(inetSocketAddress).removeChannel(nextChannel)) {
            LOG.warn("getNextChannel: Unlikely event that the channel " + nextChannel + " was already removed!");
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("getNextChannel: Fixing disconnected channel to " + inetSocketAddress + ", open = " + nextChannel.isOpen() + ", bound = " + nextChannel.isRegistered());
        }
        int i = 0;
        while (i < this.maxConnectionFailures) {
            ChannelFuture connect = this.bootstrap.connect(inetSocketAddress);
            ProgressableUtils.awaitChannelFuture(connect, this.context);
            if (connect.isSuccess()) {
                if (LOG.isInfoEnabled()) {
                    LOG.info("getNextChannel: Connected to " + inetSocketAddress + "!");
                }
                this.addressChannelMap.get(inetSocketAddress).addChannel(connect.channel());
                return connect.channel();
            }
            i++;
            LOG.warn("getNextChannel: Failed to reconnect to " + inetSocketAddress + " on attempt " + i + " out of " + this.maxConnectionFailures + " max attempts, sleeping for 5 secs", connect.cause());
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                LOG.warn("getNextChannel: Unexpected interrupted exception", e);
            }
        }
        throw new IllegalStateException("getNextChannel: Failed to connect to " + inetSocketAddress + " in " + i + " connect attempts");
    }

    public void sendWritableRequest(Integer num, WritableRequest writableRequest) {
        InetSocketAddress inetSocketAddress = this.taskIdAddressMap.get(num);
        if (this.clientRequestIdRequestInfoMap.isEmpty()) {
            this.inboundByteCounter.resetAll();
            this.outboundByteCounter.resetAll();
        }
        boolean z = true;
        if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
            z = false;
        }
        Channel nextChannel = getNextChannel(inetSocketAddress);
        RequestInfo requestInfo = new RequestInfo(inetSocketAddress, writableRequest);
        if (z) {
            writableRequest.setClientId(this.myTaskInfo.getTaskId());
            writableRequest.setRequestId(this.addressRequestIdGenerator.getNextRequestId(inetSocketAddress).longValue());
            RequestInfo putIfAbsent = this.clientRequestIdRequestInfoMap.putIfAbsent(new ClientRequestId(num.intValue(), writableRequest.getRequestId()), requestInfo);
            if (putIfAbsent != null) {
                throw new IllegalStateException("sendWritableRequest: Impossible to have a previous request id = " + writableRequest.getRequestId() + ", request info of " + putIfAbsent);
            }
        }
        ChannelFuture write = nextChannel.write(writableRequest);
        requestInfo.setWriteFuture(write);
        write.addListener2((GenericFutureListener<? extends Future<? super Void>>) this.logErrorListener);
        if (!this.limitNumberOfOpenRequests || this.clientRequestIdRequestInfoMap.size() <= this.maxNumberOfOpenRequests) {
            return;
        }
        waitSomeRequests(this.maxNumberOfOpenRequests);
    }

    public void waitAllRequests() {
        waitSomeRequests(0);
        if (LOG.isInfoEnabled()) {
            LOG.info("waitAllRequests: Finished all requests. " + this.inboundByteCounter.getMetrics() + "\n" + this.outboundByteCounter.getMetrics());
        }
    }

    private void waitSomeRequests(int i) {
        while (this.clientRequestIdRequestInfoMap.size() > i) {
            logInfoAboutOpenRequests(i);
            synchronized (this.clientRequestIdRequestInfoMap) {
                if (this.clientRequestIdRequestInfoMap.size() <= i) {
                    return;
                }
                try {
                    this.clientRequestIdRequestInfoMap.wait(this.waitingRequestMsecs);
                } catch (InterruptedException e) {
                    LOG.error("waitSomeRequests: Got unexpected InterruptedException", e);
                }
            }
            this.context.progress();
            checkRequestsForProblems();
        }
    }

    private void logInfoAboutOpenRequests(int i) {
        if (LOG.isInfoEnabled() && this.requestLogger.isPrintable()) {
            LOG.info("logInfoAboutOpenRequests: Waiting interval of " + this.waitingRequestMsecs + " msecs, " + this.clientRequestIdRequestInfoMap.size() + " open requests, waiting for it to be <= " + i + ", " + this.inboundByteCounter.getMetrics() + "\n" + this.outboundByteCounter.getMetrics());
            if (this.clientRequestIdRequestInfoMap.size() < 10) {
                for (Map.Entry<ClientRequestId, RequestInfo> entry : this.clientRequestIdRequestInfoMap.entrySet()) {
                    LOG.info("logInfoAboutOpenRequests: Waiting for request " + entry.getKey() + " - " + entry.getValue());
                }
            }
            HashMap newHashMap = Maps.newHashMap();
            Iterator<ClientRequestId> it2 = this.clientRequestIdRequestInfoMap.keySet().iterator();
            while (it2.hasNext()) {
                int destinationTaskId = it2.next().getDestinationTaskId();
                Integer num = (Integer) newHashMap.get(Integer.valueOf(destinationTaskId));
                newHashMap.put(Integer.valueOf(destinationTaskId), Integer.valueOf((num == null ? 0 : num.intValue()) + 1));
            }
            ArrayList newArrayList = Lists.newArrayList(newHashMap.entrySet());
            Collections.sort(newArrayList, new Comparator<Map.Entry<Integer, Integer>>() { // from class: org.apache.giraph.comm.netty.NettyClient.3
                @Override // java.util.Comparator
                public int compare(Map.Entry<Integer, Integer> entry2, Map.Entry<Integer, Integer> entry3) {
                    int intValue = entry2.getValue().intValue();
                    int intValue2 = entry3.getValue().intValue();
                    if (intValue < intValue2) {
                        return 1;
                    }
                    return intValue == intValue2 ? 0 : -1;
                }
            });
            StringBuilder sb = new StringBuilder();
            sb.append("logInfoAboutOpenRequests: ");
            int min = Math.min(10, newArrayList.size());
            for (int i2 = 0; i2 < min; i2++) {
                sb.append(((Map.Entry) newArrayList.get(i2)).getValue()).append(" requests for taskId=").append(((Map.Entry) newArrayList.get(i2)).getKey()).append(", ");
            }
            LOG.info(sb);
        }
    }

    private void checkRequestsForProblems() {
        long j = this.lastTimeCheckedRequestsForProblems.get();
        if (System.currentTimeMillis() >= j + this.waitingRequestMsecs && this.lastTimeCheckedRequestsForProblems.compareAndSet(j, System.currentTimeMillis())) {
            ArrayList newArrayList = Lists.newArrayList();
            ArrayList newArrayList2 = Lists.newArrayList();
            for (Map.Entry<ClientRequestId, RequestInfo> entry : this.clientRequestIdRequestInfoMap.entrySet()) {
                RequestInfo value = entry.getValue();
                ChannelFuture writeFuture = value.getWriteFuture();
                if (writeFuture != null && (!writeFuture.channel().isActive() || ((writeFuture.isDone() && !writeFuture.isSuccess()) || value.getElapsedMsecs() > this.maxRequestMilliseconds))) {
                    LOG.warn("checkRequestsForProblems: Problem with request id " + entry.getKey() + " connected = " + writeFuture.channel().isActive() + ", future done = " + writeFuture.isDone() + ", success = " + writeFuture.isSuccess() + ", cause = " + writeFuture.cause() + ", elapsed time = " + value.getElapsedMsecs() + ", destination = " + writeFuture.channel().remoteAddress() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + value);
                    newArrayList.add(entry.getKey());
                    newArrayList2.add(new RequestInfo(value.getDestinationAddress(), value.getRequest()));
                }
            }
            for (int i = 0; i < newArrayList.size(); i++) {
                ClientRequestId clientRequestId = (ClientRequestId) newArrayList.get(i);
                RequestInfo requestInfo = (RequestInfo) newArrayList2.get(i);
                if (this.clientRequestIdRequestInfoMap.put(clientRequestId, requestInfo) == null) {
                    LOG.warn("checkRequestsForProblems: Request " + clientRequestId + " completed prior to sending the next request");
                    this.clientRequestIdRequestInfoMap.remove(clientRequestId);
                }
                Channel nextChannel = getNextChannel(requestInfo.getDestinationAddress());
                if (LOG.isInfoEnabled()) {
                    LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
                }
                ChannelFuture write = nextChannel.write(requestInfo.getRequest());
                requestInfo.setWriteFuture(write);
                write.addListener2((GenericFutureListener<? extends Future<? super Void>>) this.logErrorListener);
            }
            newArrayList.clear();
            newArrayList2.clear();
        }
    }

    private static InetSocketAddress resolveAddress(int i, InetSocketAddress inetSocketAddress) {
        int i2 = 0;
        while (inetSocketAddress.isUnresolved() && i2 < i) {
            i2++;
            LOG.warn("resolveAddress: Failed to resolve " + inetSocketAddress + " on attempt " + i2 + " of " + i + " attempts, sleeping for 5 seconds");
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                LOG.warn("resolveAddress: Interrupted.", e);
            }
            inetSocketAddress = new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
        }
        if (i2 >= i) {
            throw new IllegalStateException("resolveAddress: Couldn't resolve " + inetSocketAddress + " in " + i2 + " tries.");
        }
        return inetSocketAddress;
    }
}
