/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka010.shaded.org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.MetricName;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.Measurable;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.MetricConfig;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.Metrics;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.Sensor;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.stats.Avg;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.stats.Count;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.stats.Max;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.metrics.stats.Rate;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.network.ChannelBuilder;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.network.KafkaChannel;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.network.NetworkReceive;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.network.Selectable;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.network.Send;
import org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Selector
implements Selectable {
    public static final long NO_IDLE_TIMEOUT_MS = -1L;
    private static final Logger log = LoggerFactory.getLogger(Selector.class);
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, KafkaChannel> channels;
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    private final Set<SelectionKey> immediatelyConnectedKeys;
    private final Map<String, KafkaChannel> closingChannels;
    private final List<String> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final String metricGrpPrefix;
    private final Map<String, String> metricTags;
    private final ChannelBuilder channelBuilder;
    private final int maxReceiveSize;
    private final boolean metricsPerConnection;
    private final IdleExpiryManager idleExpiryManager;

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
        try {
            this.nioSelector = java.nio.channels.Selector.open();
        }
        catch (IOException e) {
            throw new KafkaException(e);
        }
        this.maxReceiveSize = maxReceiveSize;
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.channels = new HashMap<String, KafkaChannel>();
        this.completedSends = new ArrayList<Send>();
        this.completedReceives = new ArrayList<NetworkReceive>();
        this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
        this.immediatelyConnectedKeys = new HashSet<SelectionKey>();
        this.closingChannels = new HashMap<String, KafkaChannel>();
        this.connected = new ArrayList<String>();
        this.disconnected = new ArrayList<String>();
        this.failedSends = new ArrayList<String>();
        this.sensors = new SelectorMetrics(metrics);
        this.channelBuilder = channelBuilder;
        this.metricsPerConnection = metricsPerConnection;
        this.idleExpiryManager = connectionMaxIdleMs < 0L ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
    }

    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
        this(-1, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
    }

    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        boolean connected;
        if (this.channels.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id);
        }
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (sendBufferSize != -1) {
            socket.setSendBufferSize(sendBufferSize);
        }
        if (receiveBufferSize != -1) {
            socket.setReceiveBufferSize(receiveBufferSize);
        }
        socket.setTcpNoDelay(true);
        try {
            connected = socketChannel.connect(address);
        }
        catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        }
        catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        SelectionKey key = socketChannel.register(this.nioSelector, 8);
        KafkaChannel channel = this.channelBuilder.buildChannel(id, key, this.maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);
        if (connected) {
            log.debug("Immediately connected to node {}", (Object)channel.id());
            this.immediatelyConnectedKeys.add(key);
            key.interestOps(0);
        }
    }

    public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
        SelectionKey key = socketChannel.register(this.nioSelector, 1);
        KafkaChannel channel = this.channelBuilder.buildChannel(id, key, this.maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);
    }

    @Override
    public void wakeup() {
        this.nioSelector.wakeup();
    }

    @Override
    public void close() {
        ArrayList<String> connections = new ArrayList<String>(this.channels.keySet());
        for (String id : connections) {
            this.close(id);
        }
        try {
            this.nioSelector.close();
        }
        catch (IOException | SecurityException e) {
            log.error("Exception closing nioSelector:", (Throwable)e);
        }
        this.sensors.close();
        this.channelBuilder.close();
    }

    @Override
    public void send(Send send) {
        String connectionId = send.destination();
        if (this.closingChannels.containsKey(connectionId)) {
            this.failedSends.add(connectionId);
        } else {
            KafkaChannel channel = this.channelOrFail(connectionId, false);
            try {
                channel.setSend(send);
            }
            catch (CancelledKeyException e) {
                this.failedSends.add(connectionId);
                this.close(channel, false);
            }
        }
    }

    @Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0L) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        this.clear();
        if (this.hasStagedReceives() || !this.immediatelyConnectedKeys.isEmpty()) {
            timeout = 0L;
        }
        long startSelect = this.time.nanoseconds();
        int readyKeys = this.select(timeout);
        long endSelect = this.time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, this.time.milliseconds());
        if (readyKeys > 0 || !this.immediatelyConnectedKeys.isEmpty()) {
            this.pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            this.pollSelectionKeys(this.immediatelyConnectedKeys, true, endSelect);
        }
        this.addToCompletedReceives();
        long endIo = this.time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, this.time.milliseconds());
        this.maybeCloseOldestConnection(endSelect);
    }

    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = this.channel(key);
            this.sensors.maybeRegisterConnectionMetrics(channel.id());
            if (this.idleExpiryManager != null) {
                this.idleExpiryManager.update(channel.id(), currentTimeNanos);
            }
            try {
                Send send;
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (!channel.finishConnect()) continue;
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                    SocketChannel socketChannel = (SocketChannel)key.channel();
                    log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", new Object[]{socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()});
                }
                if (channel.isConnected() && !channel.ready()) {
                    channel.prepare();
                }
                if (channel.ready() && key.isReadable() && !this.hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null) {
                        this.addToStagedReceives(channel, networkReceive);
                    }
                }
                if (channel.ready() && key.isWritable() && (send = channel.write()) != null) {
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
                if (key.isValid()) continue;
                this.close(channel, true);
            }
            catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException) {
                    log.debug("Connection with {} disconnected", (Object)desc, (Object)e);
                } else {
                    log.warn("Unexpected error from {}; closing connection", (Object)desc, (Object)e);
                }
                this.close(channel, true);
            }
        }
    }

    @Override
    public List<Send> completedSends() {
        return this.completedSends;
    }

    @Override
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override
    public List<String> disconnected() {
        return this.disconnected;
    }

    @Override
    public List<String> connected() {
        return this.connected;
    }

    @Override
    public void mute(String id) {
        KafkaChannel channel = this.channelOrFail(id, true);
        this.mute(channel);
    }

    private void mute(KafkaChannel channel) {
        channel.mute();
    }

    @Override
    public void unmute(String id) {
        KafkaChannel channel = this.channelOrFail(id, true);
        this.unmute(channel);
    }

    private void unmute(KafkaChannel channel) {
        channel.unmute();
    }

    @Override
    public void muteAll() {
        for (KafkaChannel channel : this.channels.values()) {
            this.mute(channel);
        }
    }

    @Override
    public void unmuteAll() {
        for (KafkaChannel channel : this.channels.values()) {
            this.unmute(channel);
        }
    }

    private void maybeCloseOldestConnection(long currentTimeNanos) {
        String connectionId;
        KafkaChannel channel;
        if (this.idleExpiryManager == null) {
            return;
        }
        Map.Entry<String, Long> expiredConnection = this.idleExpiryManager.pollExpiredConnection(currentTimeNanos);
        if (expiredConnection != null && (channel = this.channels.get(connectionId = expiredConnection.getKey())) != null) {
            if (log.isTraceEnabled()) {
                log.trace("About to close the idle connection from {} due to being idle for {} millis", (Object)connectionId, (Object)((currentTimeNanos - expiredConnection.getValue()) / 1000L / 1000L));
            }
            this.close(channel, true);
        }
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        Iterator<Map.Entry<String, KafkaChannel>> it = this.closingChannels.entrySet().iterator();
        while (it.hasNext()) {
            KafkaChannel channel = it.next().getValue();
            Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
            boolean sendFailed = this.failedSends.remove(channel.id());
            if (deque != null && !deque.isEmpty() && !sendFailed) continue;
            this.doClose(channel, true);
            it.remove();
        }
        this.disconnected.addAll(this.failedSends);
        this.failedSends.clear();
    }

    private int select(long ms) throws IOException {
        if (ms < 0L) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        if (ms == 0L) {
            return this.nioSelector.selectNow();
        }
        return this.nioSelector.select(ms);
    }

    @Override
    public void close(String id) {
        KafkaChannel channel = this.channels.get(id);
        if (channel != null) {
            this.close(channel, false);
        }
    }

    private void close(KafkaChannel channel, boolean processOutstanding) {
        channel.disconnect();
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        if (processOutstanding && deque != null && !deque.isEmpty()) {
            if (!channel.isMute()) {
                this.addToCompletedReceives(channel, deque);
                if (deque.isEmpty()) {
                    this.stagedReceives.remove(channel);
                }
            }
            this.closingChannels.put(channel.id(), channel);
        } else {
            this.doClose(channel, processOutstanding);
        }
        this.channels.remove(channel.id());
        if (this.idleExpiryManager != null) {
            this.idleExpiryManager.remove(channel.id());
        }
    }

    private void doClose(KafkaChannel channel, boolean notifyDisconnect) {
        try {
            channel.close();
        }
        catch (IOException e) {
            log.error("Exception closing connection to node {}:", (Object)channel.id(), (Object)e);
        }
        this.sensors.connectionClosed.record();
        this.stagedReceives.remove(channel);
        if (notifyDisconnect) {
            this.disconnected.add(channel.id());
        }
    }

    @Override
    public boolean isChannelReady(String id) {
        KafkaChannel channel = this.channels.get(id);
        return channel != null && channel.ready();
    }

    private KafkaChannel channelOrFail(String id, boolean maybeClosing) {
        KafkaChannel channel = this.channels.get(id);
        if (channel == null && maybeClosing) {
            channel = this.closingChannels.get(id);
        }
        if (channel == null) {
            throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + id + " existing connections " + this.channels.keySet());
        }
        return channel;
    }

    public List<KafkaChannel> channels() {
        return new ArrayList<KafkaChannel>(this.channels.values());
    }

    public KafkaChannel channel(String id) {
        return this.channels.get(id);
    }

    public KafkaChannel closingChannel(String id) {
        return this.closingChannels.get(id);
    }

    private KafkaChannel channel(SelectionKey key) {
        return (KafkaChannel)key.attachment();
    }

    private boolean hasStagedReceive(KafkaChannel channel) {
        return this.stagedReceives.containsKey(channel);
    }

    private boolean hasStagedReceives() {
        for (KafkaChannel channel : this.stagedReceives.keySet()) {
            if (channel.isMute()) continue;
            return true;
        }
        return false;
    }

    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!this.stagedReceives.containsKey(channel)) {
            this.stagedReceives.put(channel, new ArrayDeque());
        }
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        deque.add(receive);
    }

    private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (channel.isMute()) continue;
                Deque<NetworkReceive> deque = entry.getValue();
                this.addToCompletedReceives(channel, deque);
                if (!deque.isEmpty()) continue;
                iter.remove();
            }
        }
    }

    private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
        NetworkReceive networkReceive = stagedDeque.poll();
        this.completedReceives.add(networkReceive);
        this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
    }

    private static class IdleExpiryManager {
        private final Map<String, Long> lruConnections;
        private final long connectionsMaxIdleNanos;
        private long nextIdleCloseCheckTime;

        public IdleExpiryManager(Time time, long connectionsMaxIdleMs) {
            this.connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000L * 1000L;
            this.lruConnections = new LinkedHashMap<String, Long>(16, 0.75f, true);
            this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos;
        }

        public void update(String connectionId, long currentTimeNanos) {
            this.lruConnections.put(connectionId, currentTimeNanos);
        }

        public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) {
            if (currentTimeNanos <= this.nextIdleCloseCheckTime) {
                return null;
            }
            if (this.lruConnections.isEmpty()) {
                this.nextIdleCloseCheckTime = currentTimeNanos + this.connectionsMaxIdleNanos;
                return null;
            }
            Map.Entry<String, Long> oldestConnectionEntry = this.lruConnections.entrySet().iterator().next();
            Long connectionLastActiveTime = oldestConnectionEntry.getValue();
            this.nextIdleCloseCheckTime = connectionLastActiveTime + this.connectionsMaxIdleNanos;
            if (currentTimeNanos > this.nextIdleCloseCheckTime) {
                return oldestConnectionEntry;
            }
            return null;
        }

        public void remove(String connectionId) {
            this.lruConnections.remove(connectionId);
        }
    }

    private class SelectorMetrics {
        private final Metrics metrics;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;
        private final List<MetricName> topLevelMetricNames = new ArrayList<MetricName>();
        private final List<Sensor> sensors = new ArrayList<Sensor>();

        public SelectorMetrics(Metrics metrics) {
            this.metrics = metrics;
            String metricGrpName = Selector.this.metricGrpPrefix + "-metrics";
            StringBuilder tagsSuffix = new StringBuilder();
            for (Map.Entry tag : Selector.this.metricTags.entrySet()) {
                tagsSuffix.append((String)tag.getKey());
                tagsSuffix.append("-");
                tagsSuffix.append((String)tag.getValue());
            }
            this.connectionClosed = this.sensor("connections-closed:" + tagsSuffix.toString(), new Sensor[0]);
            MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", Selector.this.metricTags);
            this.connectionClosed.add(metricName, new Rate());
            this.connectionCreated = this.sensor("connections-created:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", Selector.this.metricTags);
            this.connectionCreated.add(metricName, new Rate());
            this.bytesTransferred = this.sensor("bytes-sent-received:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", Selector.this.metricTags);
            this.bytesTransferred.add(metricName, new Rate(new Count()));
            this.bytesSent = this.sensor("bytes-sent:" + tagsSuffix.toString(), this.bytesTransferred);
            metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate());
            metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate(new Count()));
            metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Avg());
            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Max());
            this.bytesReceived = this.sensor("bytes-received:" + tagsSuffix.toString(), this.bytesTransferred);
            metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate());
            metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate(new Count()));
            this.selectTime = this.sensor("select-time:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(new Count()));
            metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Avg());
            metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            this.ioTime = this.sensor("io-time:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", Selector.this.metricTags);
            this.ioTime.add(metricName, new Avg());
            metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", Selector.this.metricTags);
            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", Selector.this.metricTags);
            this.topLevelMetricNames.add(metricName);
            this.metrics.addMetric(metricName, new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return Selector.this.channels.size();
                }
            });
        }

        private Sensor sensor(String name, Sensor ... parents) {
            Sensor sensor = this.metrics.sensor(name, parents);
            this.sensors.add(sensor);
            return sensor;
        }

        public void maybeRegisterConnectionMetrics(String connectionId) {
            String nodeRequestName;
            Sensor nodeRequest;
            if (!connectionId.isEmpty() && Selector.this.metricsPerConnection && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) == null) {
                String metricGrpName = Selector.this.metricGrpPrefix + "-node-metrics";
                LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(Selector.this.metricTags);
                tags.put("node-id", "node-" + connectionId);
                nodeRequest = this.sensor(nodeRequestName, new Sensor[0]);
                MetricName metricName = this.metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
                nodeRequest.add(metricName, new Rate());
                metricName = this.metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
                nodeRequest.add(metricName, new Rate(new Count()));
                metricName = this.metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
                nodeRequest.add(metricName, new Avg());
                metricName = this.metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                nodeRequest.add(metricName, new Max());
                String nodeResponseName = "node-" + connectionId + ".bytes-received";
                Sensor nodeResponse = this.sensor(nodeResponseName, new Sensor[0]);
                metricName = this.metrics.metricName("incoming-byte-rate", metricGrpName, tags);
                nodeResponse.add(metricName, new Rate());
                metricName = this.metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                nodeResponse.add(metricName, new Rate(new Count()));
                String nodeTimeName = "node-" + connectionId + ".latency";
                Sensor nodeRequestTime = this.sensor(nodeTimeName, new Sensor[0]);
                metricName = this.metrics.metricName("request-latency-avg", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Avg());
                metricName = this.metrics.metricName("request-latency-max", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Max());
            }
        }

        public void recordBytesSent(String connectionId, long bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesSent.record(bytes, now);
            if (!connectionId.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void recordBytesReceived(String connection, int bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesReceived.record(bytes, now);
            if (!connection.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connection + ".bytes-received")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void close() {
            for (MetricName metricName : this.topLevelMetricNames) {
                this.metrics.removeMetric(metricName);
            }
            for (Sensor sensor : this.sensors) {
                this.metrics.removeSensor(sensor.name());
            }
        }
    }
}

