package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.shaded.org.apache.commons.cli.HelpFormatter;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.internals.IntGaugeSuite;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/common/network/Selector.class */
public class Selector implements Selectable, AutoCloseable {
    public static final long NO_IDLE_TIMEOUT_MS = -1;
    public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;
    private final Logger log;
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, KafkaChannel> channels;
    private final Set<KafkaChannel> explicitlyMutedChannels;
    private boolean outOfMemory;
    private final List<NetworkSend> completedSends;
    private final LinkedHashMap<String, NetworkReceive> completedReceives;
    private final Set<SelectionKey> immediatelyConnectedKeys;
    private final Map<String, KafkaChannel> closingChannels;
    private Set<SelectionKey> keysWithBufferedRead;
    private final Map<String, ChannelState> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final ChannelBuilder channelBuilder;
    private final int maxReceiveSize;
    private final boolean recordTimePerConnection;
    private final IdleExpiryManager idleExpiryManager;
    private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
    private final MemoryPool memoryPool;
    private final long lowMemThreshold;
    private final int failedAuthenticationDelayMs;
    private boolean madeReadProgressLastPoll;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$CloseMode.class */
    public enum CloseMode {
        GRACEFUL(true),
        NOTIFY_ONLY(true),
        DISCARD_NO_NOTIFY(false);

        boolean notifyDisconnect;

        CloseMode(boolean z) {
            this.notifyDisconnect = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$DelayedAuthenticationFailureClose.class */
    public class DelayedAuthenticationFailureClose {
        private final KafkaChannel channel;
        private final long endTimeNanos;
        private boolean closed = false;

        public DelayedAuthenticationFailureClose(KafkaChannel kafkaChannel, int i) {
            this.channel = kafkaChannel;
            this.endTimeNanos = Selector.this.time.nanoseconds() + (i * 1000 * 1000);
        }

        public final boolean tryClose(long j) {
            if (this.endTimeNanos <= j) {
                closeNow();
            }
            return this.closed;
        }

        public final void closeNow() {
            if (this.closed) {
                throw new IllegalStateException("Attempt to close a channel that has already been closed");
            }
            Selector.this.handleCloseOnAuthenticationFailure(this.channel);
            this.closed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$IdleExpiryManager.class */
    public static class IdleExpiryManager {
        private final Map<String, Long> lruConnections = new LinkedHashMap(16, 0.75f, true);
        private final long connectionsMaxIdleNanos;
        private long nextIdleCloseCheckTime;

        public IdleExpiryManager(Time time, long j) {
            this.connectionsMaxIdleNanos = j * 1000 * 1000;
            this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos;
        }

        public void update(String str, long j) {
            this.lruConnections.put(str, Long.valueOf(j));
        }

        public Map.Entry<String, Long> pollExpiredConnection(long j) {
            if (j <= this.nextIdleCloseCheckTime) {
                return null;
            }
            if (this.lruConnections.isEmpty()) {
                this.nextIdleCloseCheckTime = j + this.connectionsMaxIdleNanos;
                return null;
            }
            Map.Entry<String, Long> next = this.lruConnections.entrySet().iterator().next();
            this.nextIdleCloseCheckTime = next.getValue().longValue() + this.connectionsMaxIdleNanos;
            if (j > this.nextIdleCloseCheckTime) {
                return next;
            }
            return null;
        }

        public void remove(String str) {
            this.lruConnections.remove(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$SelectorChannelMetadataRegistry.class */
    public class SelectorChannelMetadataRegistry implements ChannelMetadataRegistry {
        private CipherInformation cipherInformation;
        private ClientInformation clientInformation;

        SelectorChannelMetadataRegistry() {
        }

        @Override // org.apache.kafka.common.network.ChannelMetadataRegistry
        public void registerCipherInformation(CipherInformation cipherInformation) {
            if (this.cipherInformation != null) {
                if (this.cipherInformation.equals(cipherInformation)) {
                    return;
                } else {
                    Selector.this.sensors.connectionsByCipher.decrement(this.cipherInformation);
                }
            }
            this.cipherInformation = cipherInformation;
            Selector.this.sensors.connectionsByCipher.increment(cipherInformation);
        }

        @Override // org.apache.kafka.common.network.ChannelMetadataRegistry
        public CipherInformation cipherInformation() {
            return this.cipherInformation;
        }

        @Override // org.apache.kafka.common.network.ChannelMetadataRegistry
        public void registerClientInformation(ClientInformation clientInformation) {
            if (this.clientInformation != null) {
                if (this.clientInformation.equals(clientInformation)) {
                    return;
                } else {
                    Selector.this.sensors.connectionsByClient.decrement(this.clientInformation);
                }
            }
            this.clientInformation = clientInformation;
            Selector.this.sensors.connectionsByClient.increment(clientInformation);
        }

        @Override // org.apache.kafka.common.network.ChannelMetadataRegistry
        public ClientInformation clientInformation() {
            return this.clientInformation;
        }

        @Override // org.apache.kafka.common.network.ChannelMetadataRegistry, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.cipherInformation != null) {
                Selector.this.sensors.connectionsByCipher.decrement(this.cipherInformation);
                this.cipherInformation = null;
            }
            if (this.clientInformation != null) {
                Selector.this.sensors.connectionsByClient.decrement(this.clientInformation);
                this.clientInformation = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$SelectorMetrics.class */
    public class SelectorMetrics implements AutoCloseable {
        private final Metrics metrics;
        private final Map<String, String> metricTags;
        private final boolean metricsPerConnection;
        private final String metricGrpName;
        private final String perConnectionMetricGrpName;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor successfulAuthentication;
        public final Sensor successfulReauthentication;
        public final Sensor successfulAuthenticationNoReauth;
        public final Sensor reauthenticationLatency;
        public final Sensor failedAuthentication;
        public final Sensor failedReauthentication;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor requestsSent;
        public final Sensor bytesReceived;
        public final Sensor responsesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;
        public final IntGaugeSuite<CipherInformation> connectionsByCipher;
        public final IntGaugeSuite<ClientInformation> connectionsByClient;
        private final List<MetricName> topLevelMetricNames = new ArrayList();
        private final List<Sensor> sensors = new ArrayList();

        public SelectorMetrics(Metrics metrics, String str, Map<String, String> map, boolean z) {
            this.metrics = metrics;
            this.metricTags = map;
            this.metricsPerConnection = z;
            this.metricGrpName = str + "-metrics";
            this.perConnectionMetricGrpName = str + "-node-metrics";
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                sb.append(entry.getKey());
                sb.append(HelpFormatter.DEFAULT_OPT_PREFIX);
                sb.append(entry.getValue());
            }
            this.connectionClosed = sensor("connections-closed:" + ((Object) sb), new Sensor[0]);
            this.connectionClosed.add(createMeter(metrics, this.metricGrpName, map, "connection-close", "connections closed"));
            this.connectionCreated = sensor("connections-created:" + ((Object) sb), new Sensor[0]);
            this.connectionCreated.add(createMeter(metrics, this.metricGrpName, map, "connection-creation", "new connections established"));
            this.successfulAuthentication = sensor("successful-authentication:" + ((Object) sb), new Sensor[0]);
            this.successfulAuthentication.add(createMeter(metrics, this.metricGrpName, map, "successful-authentication", "connections with successful authentication"));
            this.successfulReauthentication = sensor("successful-reauthentication:" + ((Object) sb), new Sensor[0]);
            this.successfulReauthentication.add(createMeter(metrics, this.metricGrpName, map, "successful-reauthentication", "successful re-authentication of connections"));
            this.successfulAuthenticationNoReauth = sensor("successful-authentication-no-reauth:" + ((Object) sb), new Sensor[0]);
            this.successfulAuthenticationNoReauth.add(metrics.metricName("successful-authentication-no-reauth-total", this.metricGrpName, "The total number of connections with successful authentication where the client does not support re-authentication", map), new CumulativeSum());
            this.failedAuthentication = sensor("failed-authentication:" + ((Object) sb), new Sensor[0]);
            this.failedAuthentication.add(createMeter(metrics, this.metricGrpName, map, "failed-authentication", "connections with failed authentication"));
            this.failedReauthentication = sensor("failed-reauthentication:" + ((Object) sb), new Sensor[0]);
            this.failedReauthentication.add(createMeter(metrics, this.metricGrpName, map, "failed-reauthentication", "failed re-authentication of connections"));
            this.reauthenticationLatency = sensor("reauthentication-latency:" + ((Object) sb), new Sensor[0]);
            this.reauthenticationLatency.add(metrics.metricName("reauthentication-latency-max", this.metricGrpName, "The max latency observed due to re-authentication", map), new Max());
            this.reauthenticationLatency.add(metrics.metricName("reauthentication-latency-avg", this.metricGrpName, "The average latency observed due to re-authentication", map), new Avg());
            this.bytesTransferred = sensor("bytes-sent-received:" + ((Object) sb), new Sensor[0]);
            this.bytesTransferred.add(createMeter(metrics, this.metricGrpName, map, new WindowedCount(), "network-io", "network operations (reads or writes) on all connections"));
            this.bytesSent = sensor("bytes-sent:" + ((Object) sb), this.bytesTransferred);
            this.bytesSent.add(createMeter(metrics, this.metricGrpName, map, "outgoing-byte", "outgoing bytes sent to all servers"));
            this.requestsSent = sensor("requests-sent:" + ((Object) sb), new Sensor[0]);
            this.requestsSent.add(createMeter(metrics, this.metricGrpName, map, new WindowedCount(), "request", "requests sent"));
            this.requestsSent.add(metrics.metricName("request-size-avg", this.metricGrpName, "The average size of requests sent.", map), new Avg());
            this.requestsSent.add(metrics.metricName("request-size-max", this.metricGrpName, "The maximum size of any request sent.", map), new Max());
            this.bytesReceived = sensor("bytes-received:" + ((Object) sb), this.bytesTransferred);
            this.bytesReceived.add(createMeter(metrics, this.metricGrpName, map, "incoming-byte", "bytes read off all sockets"));
            this.responsesReceived = sensor("responses-received:" + ((Object) sb), new Sensor[0]);
            this.responsesReceived.add(createMeter(metrics, this.metricGrpName, map, new WindowedCount(), "response", "responses received"));
            this.selectTime = sensor("select-time:" + ((Object) sb), new Sensor[0]);
            this.selectTime.add(createMeter(metrics, this.metricGrpName, map, new WindowedCount(), "select", "times the I/O layer checked for new I/O to perform"));
            this.selectTime.add(metrics.metricName("io-wait-time-ns-avg", this.metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", map), new Avg());
            this.selectTime.add(createIOThreadRatioMeter(metrics, this.metricGrpName, map, "io-wait", "waiting"));
            this.ioTime = sensor("io-time:" + ((Object) sb), new Sensor[0]);
            this.ioTime.add(metrics.metricName("io-time-ns-avg", this.metricGrpName, "The average length of time for I/O per select call in nanoseconds.", map), new Avg());
            this.ioTime.add(createIOThreadRatioMeter(metrics, this.metricGrpName, map, "io", "doing I/O"));
            this.connectionsByCipher = new IntGaugeSuite<>(Selector.this.log, "sslCiphers", metrics, cipherInformation -> {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put("cipher", cipherInformation.cipher());
                linkedHashMap.put("protocol", cipherInformation.protocol());
                linkedHashMap.putAll(map);
                return metrics.metricName("connections", this.metricGrpName, "The number of connections with this SSL cipher and protocol.", linkedHashMap);
            }, 100);
            this.connectionsByClient = new IntGaugeSuite<>(Selector.this.log, "clients", metrics, clientInformation -> {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put("clientSoftwareName", clientInformation.softwareName());
                linkedHashMap.put("clientSoftwareVersion", clientInformation.softwareVersion());
                linkedHashMap.putAll(map);
                return metrics.metricName("connections", this.metricGrpName, "The number of connections with this client and version.", linkedHashMap);
            }, 100);
            MetricName metricName = metrics.metricName("connection-count", this.metricGrpName, "The current number of active connections.", map);
            this.topLevelMetricNames.add(metricName);
            this.metrics.addMetric(metricName, (metricConfig, j) -> {
                return Selector.this.channels.size();
            });
        }

        private Meter createMeter(Metrics metrics, String str, Map<String, String> map, SampledStat sampledStat, String str2, String str3) {
            MetricName metricName = metrics.metricName(str2 + "-rate", str, String.format("The number of %s per second", str3), map);
            MetricName metricName2 = metrics.metricName(str2 + "-total", str, String.format("The total number of %s", str3), map);
            return sampledStat == null ? new Meter(metricName, metricName2) : new Meter(sampledStat, metricName, metricName2);
        }

        private Meter createMeter(Metrics metrics, String str, Map<String, String> map, String str2, String str3) {
            return createMeter(metrics, str, map, null, str2, str3);
        }

        private Meter createIOThreadRatioMeter(Metrics metrics, String str, Map<String, String> map, String str2, String str3) {
            return new Meter(TimeUnit.NANOSECONDS, metrics.metricName(str2 + "-ratio", str, String.format("The fraction of time the I/O thread spent %s", str3), map), metrics.metricName(str2 + "time-total", str, String.format("The total time the I/O thread spent %s", str3), map));
        }

        private Sensor sensor(String str, Sensor... sensorArr) {
            Sensor sensor = this.metrics.sensor(str, sensorArr);
            this.sensors.add(sensor);
            return sensor;
        }

        public void maybeRegisterConnectionMetrics(String str) {
            if (str.isEmpty() || !this.metricsPerConnection) {
                return;
            }
            String str2 = "node-" + str + ".requests-sent";
            if (this.metrics.getSensor(str2) == null) {
                LinkedHashMap linkedHashMap = new LinkedHashMap(this.metricTags);
                linkedHashMap.put("node-id", "node-" + str);
                Sensor sensor = sensor(str2, new Sensor[0]);
                sensor.add(createMeter(this.metrics, this.perConnectionMetricGrpName, linkedHashMap, new WindowedCount(), "request", "requests sent"));
                sensor.add(this.metrics.metricName("request-size-avg", this.perConnectionMetricGrpName, "The average size of requests sent.", linkedHashMap), new Avg());
                sensor.add(this.metrics.metricName("request-size-max", this.perConnectionMetricGrpName, "The maximum size of any request sent.", linkedHashMap), new Max());
                sensor("node-" + str + ".bytes-sent", new Sensor[0]).add(createMeter(this.metrics, this.perConnectionMetricGrpName, linkedHashMap, "outgoing-byte", "outgoing bytes"));
                sensor("node-" + str + ".responses-received", new Sensor[0]).add(createMeter(this.metrics, this.perConnectionMetricGrpName, linkedHashMap, new WindowedCount(), "response", "responses received"));
                sensor("node-" + str + ".bytes-received", new Sensor[0]).add(createMeter(this.metrics, this.perConnectionMetricGrpName, linkedHashMap, "incoming-byte", "incoming bytes"));
                Sensor sensor2 = sensor("node-" + str + ".latency", new Sensor[0]);
                sensor2.add(this.metrics.metricName("request-latency-avg", this.perConnectionMetricGrpName, linkedHashMap), new Avg());
                sensor2.add(this.metrics.metricName("request-latency-max", this.perConnectionMetricGrpName, linkedHashMap), new Max());
            }
        }

        public void recordBytesSent(String str, long j, long j2) {
            this.bytesSent.record(j, j2);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".bytes-sent");
            if (sensor != null) {
                sensor.record(j, j2);
            }
        }

        public void recordCompletedSend(String str, long j, long j2) {
            this.requestsSent.record(j, j2);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".requests-sent");
            if (sensor != null) {
                sensor.record(j, j2);
            }
        }

        public void recordBytesReceived(String str, long j, long j2) {
            this.bytesReceived.record(j, j2);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".bytes-received");
            if (sensor != null) {
                sensor.record(j, j2);
            }
        }

        public void recordCompletedReceive(String str, long j, long j2) {
            this.responsesReceived.record(j, j2);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".responses-received");
            if (sensor != null) {
                sensor.record(j, j2);
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            Iterator<MetricName> it = this.topLevelMetricNames.iterator();
            while (it.hasNext()) {
                this.metrics.removeMetric(it.next());
            }
            Iterator<Sensor> it2 = this.sensors.iterator();
            while (it2.hasNext()) {
                this.metrics.removeSensor(it2.next().name());
            }
            this.connectionsByCipher.close();
            this.connectionsByClient.close();
        }
    }

    public Selector(int i, long j, int i2, Metrics metrics, Time time, String str, Map<String, String> map, boolean z, boolean z2, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) {
        this.madeReadProgressLastPoll = true;
        try {
            this.nioSelector = java.nio.channels.Selector.open();
            this.maxReceiveSize = i;
            this.time = time;
            this.channels = new HashMap();
            this.explicitlyMutedChannels = new HashSet();
            this.outOfMemory = false;
            this.completedSends = new ArrayList();
            this.completedReceives = new LinkedHashMap<>();
            this.immediatelyConnectedKeys = new HashSet();
            this.closingChannels = new HashMap();
            this.keysWithBufferedRead = new HashSet();
            this.connected = new ArrayList();
            this.disconnected = new HashMap();
            this.failedSends = new ArrayList();
            this.log = logContext.logger(Selector.class);
            this.sensors = new SelectorMetrics(metrics, str, map, z);
            this.channelBuilder = channelBuilder;
            this.recordTimePerConnection = z2;
            this.idleExpiryManager = j < 0 ? null : new IdleExpiryManager(time, j);
            this.memoryPool = memoryPool;
            this.lowMemThreshold = (long) (0.1d * this.memoryPool.size());
            this.failedAuthenticationDelayMs = i2;
            this.delayedClosingChannels = i2 > 0 ? new LinkedHashMap<>() : null;
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

    public Selector(int i, long j, Metrics metrics, Time time, String str, Map<String, String> map, boolean z, boolean z2, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) {
        this(i, j, 0, metrics, time, str, map, z, z2, channelBuilder, memoryPool, logContext);
    }

    public Selector(int i, long j, int i2, Metrics metrics, Time time, String str, Map<String, String> map, boolean z, ChannelBuilder channelBuilder, LogContext logContext) {
        this(i, j, i2, metrics, time, str, map, z, false, channelBuilder, MemoryPool.NONE, logContext);
    }

    public Selector(int i, long j, Metrics metrics, Time time, String str, Map<String, String> map, boolean z, ChannelBuilder channelBuilder, LogContext logContext) {
        this(i, j, 0, metrics, time, str, map, z, channelBuilder, logContext);
    }

    public Selector(long j, Metrics metrics, Time time, String str, ChannelBuilder channelBuilder, LogContext logContext) {
        this(-1, j, metrics, time, str, Collections.emptyMap(), true, channelBuilder, logContext);
    }

    public Selector(long j, int i, Metrics metrics, Time time, String str, ChannelBuilder channelBuilder, LogContext logContext) {
        this(-1, j, i, metrics, time, str, Collections.emptyMap(), true, channelBuilder, logContext);
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void connect(String str, InetSocketAddress inetSocketAddress, int i, int i2) throws IOException {
        ensureNotRegistered(str);
        SocketChannel open = SocketChannel.open();
        SelectionKey selectionKey = null;
        try {
            configureSocketChannel(open, i, i2);
            boolean doConnect = doConnect(open, inetSocketAddress);
            selectionKey = registerChannel(str, open, 8);
            if (doConnect) {
                this.log.debug("Immediately connected to node {}", str);
                this.immediatelyConnectedKeys.add(selectionKey);
                selectionKey.interestOps(0);
            }
        } catch (IOException | RuntimeException e) {
            if (selectionKey != null) {
                this.immediatelyConnectedKeys.remove(selectionKey);
            }
            this.channels.remove(str);
            open.close();
            throw e;
        }
    }

    protected boolean doConnect(SocketChannel socketChannel, InetSocketAddress inetSocketAddress) throws IOException {
        try {
            return socketChannel.connect(inetSocketAddress);
        } catch (UnresolvedAddressException e) {
            throw new IOException("Can't resolve address: " + inetSocketAddress, e);
        }
    }

    private void configureSocketChannel(SocketChannel socketChannel, int i, int i2) throws IOException {
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (i != -1) {
            socket.setSendBufferSize(i);
        }
        if (i2 != -1) {
            socket.setReceiveBufferSize(i2);
        }
        socket.setTcpNoDelay(true);
    }

    public void register(String str, SocketChannel socketChannel) throws IOException {
        ensureNotRegistered(str);
        registerChannel(str, socketChannel, 1);
        this.sensors.connectionCreated.record();
        ChannelMetadataRegistry channelMetadataRegistry = channel(str).channelMetadataRegistry();
        if (channelMetadataRegistry.clientInformation() == null) {
            channelMetadataRegistry.registerClientInformation(ClientInformation.EMPTY);
        }
    }

    private void ensureNotRegistered(String str) {
        if (this.channels.containsKey(str)) {
            throw new IllegalStateException("There is already a connection for id " + str);
        }
        if (this.closingChannels.containsKey(str)) {
            throw new IllegalStateException("There is already a connection for id " + str + " that is still being closed");
        }
    }

    protected SelectionKey registerChannel(String str, SocketChannel socketChannel, int i) throws IOException {
        SelectionKey register = socketChannel.register(this.nioSelector, i);
        KafkaChannel buildAndAttachKafkaChannel = buildAndAttachKafkaChannel(socketChannel, str, register);
        this.channels.put(str, buildAndAttachKafkaChannel);
        if (this.idleExpiryManager != null) {
            this.idleExpiryManager.update(buildAndAttachKafkaChannel.id(), this.time.nanoseconds());
        }
        return register;
    }

    private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String str, SelectionKey selectionKey) throws IOException {
        try {
            KafkaChannel buildChannel = this.channelBuilder.buildChannel(str, selectionKey, this.maxReceiveSize, this.memoryPool, new SelectorChannelMetadataRegistry());
            selectionKey.attach(buildChannel);
            return buildChannel;
        } catch (Exception e) {
            try {
                socketChannel.close();
                selectionKey.cancel();
                throw new IOException("Channel could not be created for socket " + socketChannel, e);
            } catch (Throwable th) {
                selectionKey.cancel();
                throw th;
            }
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void wakeup() {
        this.nioSelector.wakeup();
    }

    @Override // org.apache.kafka.common.network.Selectable, java.lang.AutoCloseable
    public void close() {
        ArrayList arrayList = new ArrayList(this.channels.keySet());
        AtomicReference atomicReference = new AtomicReference();
        Utils.closeAllQuietly(atomicReference, "release connections", (AutoCloseable[]) arrayList.stream().map(str -> {
            return () -> {
                close(str);
            };
        }).toArray(i -> {
            return new AutoCloseable[i];
        }));
        Utils.closeQuietly(this.nioSelector, "nioSelector", atomicReference);
        Utils.closeQuietly(this.sensors, "sensors", atomicReference);
        Utils.closeQuietly(this.channelBuilder, "channelBuilder", atomicReference);
        Throwable th = (Throwable) atomicReference.get();
        if ((th instanceof RuntimeException) && !(th instanceof SecurityException)) {
            throw ((RuntimeException) th);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void send(NetworkSend networkSend) {
        String destinationId = networkSend.destinationId();
        KafkaChannel openOrClosingChannelOrFail = openOrClosingChannelOrFail(destinationId);
        if (this.closingChannels.containsKey(destinationId)) {
            this.failedSends.add(destinationId);
            return;
        }
        try {
            openOrClosingChannelOrFail.setSend(networkSend);
        } catch (Exception e) {
            openOrClosingChannelOrFail.state(ChannelState.FAILED_SEND);
            this.failedSends.add(destinationId);
            close(openOrClosingChannelOrFail, CloseMode.DISCARD_NO_NOTIFY);
            if (e instanceof CancelledKeyException) {
                return;
            }
            this.log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", destinationId, e);
            throw e;
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void poll(long j) throws IOException {
        if (j < 0) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        boolean z = this.madeReadProgressLastPoll;
        clear();
        boolean z2 = !this.keysWithBufferedRead.isEmpty();
        if (!this.immediatelyConnectedKeys.isEmpty() || (z && z2)) {
            j = 0;
        }
        if (!this.memoryPool.isOutOfMemory() && this.outOfMemory) {
            this.log.trace("Broker no longer low on memory - unmuting incoming sockets");
            for (KafkaChannel kafkaChannel : this.channels.values()) {
                if (kafkaChannel.isInMutableState() && !this.explicitlyMutedChannels.contains(kafkaChannel)) {
                    kafkaChannel.maybeUnmute();
                }
            }
            this.outOfMemory = false;
        }
        long nanoseconds = this.time.nanoseconds();
        int select = select(j);
        long nanoseconds2 = this.time.nanoseconds();
        this.sensors.selectTime.record(nanoseconds2 - nanoseconds, this.time.milliseconds());
        if (select > 0 || !this.immediatelyConnectedKeys.isEmpty() || z2) {
            Set<SelectionKey> selectedKeys = this.nioSelector.selectedKeys();
            if (z2) {
                this.keysWithBufferedRead.removeAll(selectedKeys);
                Set<SelectionKey> set = this.keysWithBufferedRead;
                this.keysWithBufferedRead = new HashSet();
                pollSelectionKeys(set, false, nanoseconds2);
            }
            pollSelectionKeys(selectedKeys, false, nanoseconds2);
            selectedKeys.clear();
            pollSelectionKeys(this.immediatelyConnectedKeys, true, nanoseconds2);
            this.immediatelyConnectedKeys.clear();
        } else {
            this.madeReadProgressLastPoll = true;
        }
        long nanoseconds3 = this.time.nanoseconds();
        this.sensors.ioTime.record(nanoseconds3 - nanoseconds2, this.time.milliseconds());
        completeDelayedChannelClose(nanoseconds3);
        maybeCloseOldestConnection(nanoseconds2);
    }

    /* JADX WARN: Removed duplicated region for block: B:122:0x0222  */
    /* JADX WARN: Removed duplicated region for block: B:27:0x0102 A[Catch: Exception -> 0x0256, all -> 0x0334, TryCatch #1 {Exception -> 0x0256, blocks: (B:124:0x0068, B:21:0x00e5, B:23:0x00ed, B:25:0x00f5, B:27:0x0102, B:32:0x0122, B:34:0x0137, B:35:0x017f, B:38:0x0194, B:40:0x0146, B:41:0x015d, B:43:0x0172, B:45:0x019e, B:47:0x01a6, B:49:0x01b1, B:50:0x01b9, B:52:0x01d5, B:54:0x01dd, B:56:0x01e5, B:58:0x01ee, B:60:0x01fc, B:61:0x0202, B:63:0x020a, B:69:0x0225, B:72:0x023a, B:74:0x0242, B:80:0x0239, B:12:0x0070, B:20:0x0078), top: B:123:0x0068, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:63:0x020a A[Catch: Exception -> 0x0256, all -> 0x0334, TryCatch #1 {Exception -> 0x0256, blocks: (B:124:0x0068, B:21:0x00e5, B:23:0x00ed, B:25:0x00f5, B:27:0x0102, B:32:0x0122, B:34:0x0137, B:35:0x017f, B:38:0x0194, B:40:0x0146, B:41:0x015d, B:43:0x0172, B:45:0x019e, B:47:0x01a6, B:49:0x01b1, B:50:0x01b9, B:52:0x01d5, B:54:0x01dd, B:56:0x01e5, B:58:0x01ee, B:60:0x01fc, B:61:0x0202, B:63:0x020a, B:69:0x0225, B:72:0x023a, B:74:0x0242, B:80:0x0239, B:12:0x0070, B:20:0x0078), top: B:123:0x0068, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:66:0x021d  */
    /* JADX WARN: Removed duplicated region for block: B:74:0x0242 A[Catch: Exception -> 0x0256, all -> 0x0334, TryCatch #1 {Exception -> 0x0256, blocks: (B:124:0x0068, B:21:0x00e5, B:23:0x00ed, B:25:0x00f5, B:27:0x0102, B:32:0x0122, B:34:0x0137, B:35:0x017f, B:38:0x0194, B:40:0x0146, B:41:0x015d, B:43:0x0172, B:45:0x019e, B:47:0x01a6, B:49:0x01b1, B:50:0x01b9, B:52:0x01d5, B:54:0x01dd, B:56:0x01e5, B:58:0x01ee, B:60:0x01fc, B:61:0x0202, B:63:0x020a, B:69:0x0225, B:72:0x023a, B:74:0x0242, B:80:0x0239, B:12:0x0070, B:20:0x0078), top: B:123:0x0068, outer: #0 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void pollSelectionKeys(java.util.Set<java.nio.channels.SelectionKey> r8, boolean r9, long r10) {
        /*
            Method dump skipped, instructions count: 837
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long):void");
    }

    private void attemptWrite(SelectionKey selectionKey, KafkaChannel kafkaChannel, long j) throws IOException {
        if (kafkaChannel.hasSend() && kafkaChannel.ready() && selectionKey.isWritable() && !kafkaChannel.maybeBeginClientReauthentication(() -> {
            return Long.valueOf(j);
        })) {
            write(kafkaChannel);
        }
    }

    void write(KafkaChannel kafkaChannel) throws IOException {
        String id = kafkaChannel.id();
        long write = kafkaChannel.write();
        NetworkSend maybeCompleteSend = kafkaChannel.maybeCompleteSend();
        if (write > 0 || maybeCompleteSend != null) {
            long milliseconds = this.time.milliseconds();
            if (write > 0) {
                this.sensors.recordBytesSent(id, write, milliseconds);
            }
            if (maybeCompleteSend != null) {
                this.completedSends.add(maybeCompleteSend);
                this.sensors.recordCompletedSend(id, maybeCompleteSend.size(), milliseconds);
            }
        }
    }

    private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> set) {
        if (this.outOfMemory || this.memoryPool.availableMemory() >= this.lowMemThreshold) {
            return set;
        }
        ArrayList arrayList = new ArrayList(set);
        Collections.shuffle(arrayList);
        return arrayList;
    }

    private void attemptRead(KafkaChannel kafkaChannel) throws IOException {
        String id = kafkaChannel.id();
        long read = kafkaChannel.read();
        if (read != 0) {
            long milliseconds = this.time.milliseconds();
            this.sensors.recordBytesReceived(id, read, milliseconds);
            this.madeReadProgressLastPoll = true;
            NetworkReceive maybeCompleteReceive = kafkaChannel.maybeCompleteReceive();
            if (maybeCompleteReceive != null) {
                addToCompletedReceives(kafkaChannel, maybeCompleteReceive, milliseconds);
            }
        }
        if (kafkaChannel.isMuted()) {
            this.outOfMemory = true;
        } else {
            this.madeReadProgressLastPoll = true;
        }
    }

    private boolean maybeReadFromClosingChannel(KafkaChannel kafkaChannel) {
        boolean z;
        if (kafkaChannel.state().state() != ChannelState.State.READY) {
            z = false;
        } else if (this.explicitlyMutedChannels.contains(kafkaChannel) || hasCompletedReceive(kafkaChannel)) {
            z = true;
        } else {
            try {
                attemptRead(kafkaChannel);
                z = hasCompletedReceive(kafkaChannel);
            } catch (Exception e) {
                this.log.trace("Read from closing channel failed, ignoring exception", e);
                z = false;
            }
        }
        return z;
    }

    private void maybeRecordTimePerConnection(KafkaChannel kafkaChannel, long j) {
        if (this.recordTimePerConnection) {
            kafkaChannel.addNetworkThreadTimeNanos(this.time.nanoseconds() - j);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<NetworkSend> completedSends() {
        return this.completedSends;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public Collection<NetworkReceive> completedReceives() {
        return this.completedReceives.values();
    }

    @Override // org.apache.kafka.common.network.Selectable
    public Map<String, ChannelState> disconnected() {
        return this.disconnected;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<String> connected() {
        return this.connected;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void mute(String str) {
        mute(openOrClosingChannelOrFail(str));
    }

    private void mute(KafkaChannel kafkaChannel) {
        kafkaChannel.mute();
        this.explicitlyMutedChannels.add(kafkaChannel);
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void unmute(String str) {
        unmute(openOrClosingChannelOrFail(str));
    }

    private void unmute(KafkaChannel kafkaChannel) {
        if (kafkaChannel.maybeUnmute()) {
            this.explicitlyMutedChannels.remove(kafkaChannel);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void muteAll() {
        Iterator<KafkaChannel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            mute(it.next());
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void unmuteAll() {
        Iterator<KafkaChannel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            unmute(it.next());
        }
    }

    void completeDelayedChannelClose(long j) {
        if (this.delayedClosingChannels == null) {
            return;
        }
        while (!this.delayedClosingChannels.isEmpty() && this.delayedClosingChannels.values().iterator().next().tryClose(j)) {
        }
    }

    private void maybeCloseOldestConnection(long j) {
        Map.Entry<String, Long> pollExpiredConnection;
        String key;
        KafkaChannel kafkaChannel;
        if (this.idleExpiryManager == null || (pollExpiredConnection = this.idleExpiryManager.pollExpiredConnection(j)) == null || (kafkaChannel = this.channels.get((key = pollExpiredConnection.getKey()))) == null) {
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("About to close the idle connection from {} due to being idle for {} millis", key, Long.valueOf(((j - pollExpiredConnection.getValue().longValue()) / 1000) / 1000));
        }
        kafkaChannel.state(ChannelState.EXPIRED);
        close(kafkaChannel, CloseMode.GRACEFUL);
    }

    public void clearCompletedReceives() {
        this.completedReceives.clear();
    }

    public void clearCompletedSends() {
        this.completedSends.clear();
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        Iterator<Map.Entry<String, KafkaChannel>> it = this.closingChannels.entrySet().iterator();
        while (it.hasNext()) {
            KafkaChannel value = it.next().getValue();
            boolean remove = this.failedSends.remove(value.id());
            boolean z = false;
            if (!remove) {
                z = maybeReadFromClosingChannel(value);
            }
            if (!z || remove) {
                doClose(value, true);
                it.remove();
            }
        }
        Iterator<String> it2 = this.failedSends.iterator();
        while (it2.hasNext()) {
            this.disconnected.put(it2.next(), ChannelState.FAILED_SEND);
        }
        this.failedSends.clear();
        this.madeReadProgressLastPoll = false;
    }

    private int select(long j) throws IOException {
        if (j < 0) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        return j == 0 ? this.nioSelector.selectNow() : this.nioSelector.select(j);
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void close(String str) {
        KafkaChannel kafkaChannel = this.channels.get(str);
        if (kafkaChannel != null) {
            kafkaChannel.state(ChannelState.LOCAL_CLOSE);
            close(kafkaChannel, CloseMode.DISCARD_NO_NOTIFY);
        } else {
            KafkaChannel remove = this.closingChannels.remove(str);
            if (remove != null) {
                doClose(remove, false);
            }
        }
    }

    private void maybeDelayCloseOnAuthenticationFailure(KafkaChannel kafkaChannel) {
        DelayedAuthenticationFailureClose delayedAuthenticationFailureClose = new DelayedAuthenticationFailureClose(kafkaChannel, this.failedAuthenticationDelayMs);
        if (this.delayedClosingChannels != null) {
            this.delayedClosingChannels.put(kafkaChannel.id(), delayedAuthenticationFailureClose);
        } else {
            delayedAuthenticationFailureClose.closeNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCloseOnAuthenticationFailure(KafkaChannel kafkaChannel) {
        try {
            kafkaChannel.completeCloseOnAuthenticationFailure();
        } catch (Exception e) {
            this.log.error("Exception handling close on authentication failure node {}", kafkaChannel.id(), e);
        } finally {
            close(kafkaChannel, CloseMode.GRACEFUL);
        }
    }

    private void close(KafkaChannel kafkaChannel, CloseMode closeMode) {
        kafkaChannel.disconnect();
        this.connected.remove(kafkaChannel.id());
        if (closeMode == CloseMode.GRACEFUL && maybeReadFromClosingChannel(kafkaChannel)) {
            this.closingChannels.put(kafkaChannel.id(), kafkaChannel);
            this.log.debug("Tracking closing connection {} to process outstanding requests", kafkaChannel.id());
        } else {
            doClose(kafkaChannel, closeMode.notifyDisconnect);
        }
        this.channels.remove(kafkaChannel.id());
        if (this.delayedClosingChannels != null) {
            this.delayedClosingChannels.remove(kafkaChannel.id());
        }
        if (this.idleExpiryManager != null) {
            this.idleExpiryManager.remove(kafkaChannel.id());
        }
    }

    private void doClose(KafkaChannel kafkaChannel, boolean z) {
        SelectionKey selectionKey = kafkaChannel.selectionKey();
        try {
            try {
                this.immediatelyConnectedKeys.remove(selectionKey);
                this.keysWithBufferedRead.remove(selectionKey);
                kafkaChannel.close();
                selectionKey.cancel();
                selectionKey.attach(null);
            } catch (IOException e) {
                this.log.error("Exception closing connection to node {}:", kafkaChannel.id(), e);
                selectionKey.cancel();
                selectionKey.attach(null);
            }
            this.sensors.connectionClosed.record();
            this.explicitlyMutedChannels.remove(kafkaChannel);
            if (z) {
                this.disconnected.put(kafkaChannel.id(), kafkaChannel.state());
            }
        } catch (Throwable th) {
            selectionKey.cancel();
            selectionKey.attach(null);
            throw th;
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public boolean isChannelReady(String str) {
        KafkaChannel kafkaChannel = this.channels.get(str);
        return kafkaChannel != null && kafkaChannel.ready();
    }

    private KafkaChannel openOrClosingChannelOrFail(String str) {
        KafkaChannel kafkaChannel = this.channels.get(str);
        if (kafkaChannel == null) {
            kafkaChannel = this.closingChannels.get(str);
        }
        if (kafkaChannel == null) {
            throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + str + " existing connections " + this.channels.keySet());
        }
        return kafkaChannel;
    }

    public List<KafkaChannel> channels() {
        return new ArrayList(this.channels.values());
    }

    public KafkaChannel channel(String str) {
        return this.channels.get(str);
    }

    public KafkaChannel closingChannel(String str) {
        return this.closingChannels.get(str);
    }

    public KafkaChannel lowestPriorityChannel() {
        KafkaChannel kafkaChannel = null;
        if (!this.closingChannels.isEmpty()) {
            kafkaChannel = this.closingChannels.values().iterator().next();
        } else if (this.idleExpiryManager != null && !this.idleExpiryManager.lruConnections.isEmpty()) {
            kafkaChannel = channel((String) this.idleExpiryManager.lruConnections.keySet().iterator().next());
        } else if (!this.channels.isEmpty()) {
            kafkaChannel = this.channels.values().iterator().next();
        }
        return kafkaChannel;
    }

    private KafkaChannel channel(SelectionKey selectionKey) {
        return (KafkaChannel) selectionKey.attachment();
    }

    private boolean hasCompletedReceive(KafkaChannel kafkaChannel) {
        return this.completedReceives.containsKey(kafkaChannel.id());
    }

    private void addToCompletedReceives(KafkaChannel kafkaChannel, NetworkReceive networkReceive, long j) {
        if (hasCompletedReceive(kafkaChannel)) {
            throw new IllegalStateException("Attempting to add second completed receive to channel " + kafkaChannel.id());
        }
        this.completedReceives.put(kafkaChannel.id(), networkReceive);
        this.sensors.recordCompletedReceive(kafkaChannel.id(), networkReceive.size(), j);
    }

    public Set<SelectionKey> keys() {
        return new HashSet(this.nioSelector.keys());
    }

    boolean isOutOfMemory() {
        return this.outOfMemory;
    }

    boolean isMadeReadProgressLastPoll() {
        return this.madeReadProgressLastPoll;
    }

    Map<?, ?> delayedClosingChannels() {
        return this.delayedClosingChannels;
    }
}
