/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ApiVersions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.KafkaClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.StaleMetadataException;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.Config;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeLogDirsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.MemberDescription;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewPartitions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RenewDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.TopicDescription;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.TopicListing;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Cluster;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.ConsumerGroupState;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaFuture;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Node;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartitionReplica;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.acl.AclBinding;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.config.ConfigResource;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ApiException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.AuthenticationException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.RetriableException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownServerException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.MetricConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Sensor;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilder;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selectable;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.protocol.Errors;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AbstractRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AbstractResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ApiError;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.MetadataRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.MetadataResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.KafkaThread;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.Time;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

@InterfaceStability.Evolving
public class KafkaAdminClient
extends AdminClient {
    private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.admin.client";
    private static final long INVALID_SHUTDOWN_TIME = -1L;
    static final String NETWORK_THREAD_PREFIX = "kafka-admin-client-thread";
    private final Logger log;
    private final int defaultTimeoutMs;
    private final String clientId;
    private final Time time;
    private final AdminMetadataManager metadataManager;
    private final Metrics metrics;
    private final KafkaClient client;
    private final AdminClientRunnable runnable;
    private final Thread thread;
    private final AtomicLong hardShutdownTimeMs = new AtomicLong(-1L);
    private final TimeoutProcessorFactory timeoutProcessorFactory;
    private final int maxRetries;
    private final long retryBackoffMs;

    static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
        List<V> list = map.get(key);
        if (list != null) {
            return list;
        }
        list = new LinkedList<V>();
        map.put(key, list);
        return list;
    }

    private static <T> void completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) {
        for (KafkaFutureImpl<T> future : futures) {
            future.completeExceptionally(exc);
        }
    }

    static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
        long deltaMs = deadlineMs - now;
        if (deltaMs > Integer.MAX_VALUE) {
            deltaMs = Integer.MAX_VALUE;
        } else if (deltaMs < Integer.MIN_VALUE) {
            deltaMs = Integer.MIN_VALUE;
        }
        return (int)deltaMs;
    }

    static String generateClientId(AdminClientConfig config) {
        String clientId = config.getString("client.id");
        if (!clientId.isEmpty()) {
            return clientId;
        }
        return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
    }

    private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
        if (optionTimeoutMs != null) {
            return now + (long)Math.max(0, optionTimeoutMs);
        }
        return now + (long)this.defaultTimeoutMs;
    }

    static String prettyPrintException(Throwable throwable) {
        if (throwable == null) {
            return "Null exception.";
        }
        if (throwable.getMessage() != null) {
            return throwable.getClass().getSimpleName() + ": " + throwable.getMessage();
        }
        return throwable.getClass().getSimpleName();
    }

    static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
        Metrics metrics = null;
        NetworkClient networkClient = null;
        Time time = Time.SYSTEM;
        String clientId = KafkaAdminClient.generateClientId(config);
        ChannelBuilder channelBuilder = null;
        Selector selector = null;
        ApiVersions apiVersions = new ApiVersions();
        LogContext logContext = KafkaAdminClient.createLogContext(clientId);
        try {
            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, config.getLong("retry.backoff.ms"), config.getLong("metadata.max.age.ms"));
            List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
            reporters.add(new JmxReporter(JMX_PREFIX));
            metrics = new Metrics(metricConfig, reporters, time);
            String metricGrpPrefix = "admin-client";
            channelBuilder = ClientUtils.createChannelBuilder(config);
            selector = new Selector(config.getLong("connections.max.idle.ms"), metrics, time, metricGrpPrefix, channelBuilder, logContext);
            networkClient = new NetworkClient((Selectable)selector, metadataManager.updater(), clientId, 1, (long)config.getLong("reconnect.backoff.ms"), (long)config.getLong("reconnect.backoff.max.ms"), (int)config.getInt("send.buffer.bytes"), (int)config.getInt("receive.buffer.bytes"), (int)TimeUnit.HOURS.toMillis(1L), time, true, apiVersions, logContext);
            return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient, timeoutProcessorFactory, logContext);
        }
        catch (Throwable exc) {
            Utils.closeQuietly(metrics, "Metrics");
            Utils.closeQuietly(networkClient, "NetworkClient");
            Utils.closeQuietly(selector, "Selector");
            Utils.closeQuietly(channelBuilder, "ChannelBuilder");
            throw new KafkaException("Failed create new KafkaAdminClient", exc);
        }
    }

    static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Time time) {
        Metrics metrics = null;
        String clientId = KafkaAdminClient.generateClientId(config);
        try {
            metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
            LogContext logContext = KafkaAdminClient.createLogContext(clientId);
            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, config.getLong("retry.backoff.ms"), config.getLong("metadata.max.age.ms"));
            return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, client, null, logContext);
        }
        catch (Throwable exc) {
            Utils.closeQuietly(metrics, "Metrics");
            throw new KafkaException("Failed create new KafkaAdminClient", exc);
        }
    }

    static LogContext createLogContext(String clientId) {
        return new LogContext("[AdminClient clientId=" + clientId + "] ");
    }

    private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, AdminMetadataManager metadataManager, Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory, LogContext logContext) {
        this.defaultTimeoutMs = config.getInt("request.timeout.ms");
        this.clientId = clientId;
        this.log = logContext.logger(KafkaAdminClient.class);
        this.time = time;
        this.metadataManager = metadataManager;
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"));
        metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
        this.metrics = metrics;
        this.client = client;
        this.runnable = new AdminClientRunnable();
        String threadName = "kafka-admin-client-thread | " + clientId;
        this.thread = new KafkaThread(threadName, (Runnable)this.runnable, true);
        this.timeoutProcessorFactory = timeoutProcessorFactory == null ? new TimeoutProcessorFactory() : timeoutProcessorFactory;
        this.maxRetries = config.getInt("retries");
        this.retryBackoffMs = config.getLong("retry.backoff.ms");
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
        this.log.debug("Kafka admin client initialized");
        this.thread.start();
    }

    Time time() {
        return this.time;
    }

    @Override
    public void close(long duration, TimeUnit unit) {
        long newHardShutdownTimeMs;
        block6: {
            long waitTimeMs = unit.toMillis(duration);
            waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365L), waitTimeMs);
            long now = this.time.milliseconds();
            newHardShutdownTimeMs = now + waitTimeMs;
            long prev = -1L;
            do {
                if (!this.hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) continue;
                if (prev == -1L) {
                    this.log.debug("Initiating close operation.");
                } else {
                    this.log.debug("Moving hard shutdown time forward.");
                }
                this.client.wakeup();
                break block6;
            } while ((prev = this.hardShutdownTimeMs.get()) >= newHardShutdownTimeMs);
            this.log.debug("Hard shutdown time is already earlier than requested.");
            newHardShutdownTimeMs = prev;
        }
        if (this.log.isDebugEnabled()) {
            long deltaMs = Math.max(0L, newHardShutdownTimeMs - this.time.milliseconds());
            this.log.debug("Waiting for the I/O thread to exit. Hard shutdown in {} ms.", (Object)deltaMs);
        }
        try {
            this.thread.join();
            AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
            this.log.debug("Kafka admin client closed.");
        }
        catch (InterruptedException e) {
            this.log.debug("Interrupted while joining I/O thread", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    private static boolean topicNameIsUnrepresentable(String topicName) {
        return topicName == null || topicName.isEmpty();
    }

    private static boolean groupIdIsUnrepresentable(String groupId) {
        return groupId == null;
    }

    @Override
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, final CreateTopicsOptions options) {
        final HashMap topicFutures = new HashMap(newTopics.size());
        final HashMap<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<String, CreateTopicsRequest.TopicDetails>(newTopics.size());
        for (NewTopic newTopic : newTopics) {
            if (KafkaAdminClient.topicNameIsUnrepresentable(newTopic.name())) {
                KafkaFutureImpl future = new KafkaFutureImpl();
                future.completeExceptionally(new InvalidTopicException("The given topic name '" + newTopic.name() + "' cannot be represented in a request."));
                topicFutures.put(newTopic.name(), future);
                continue;
            }
            if (topicFutures.containsKey(newTopic.name())) continue;
            topicFutures.put(newTopic.name(), new KafkaFutureImpl());
            topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
        }
        long now = this.time.milliseconds();
        Call call = new Call("createTopics", this.calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()){

            @Override
            public AbstractRequest.Builder createRequest(int timeoutMs) {
                return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly());
            }

            @Override
            public void handleResponse(AbstractResponse abstractResponse) {
                KafkaFutureImpl future;
                CreateTopicsResponse response = (CreateTopicsResponse)abstractResponse;
                for (ApiError apiError : response.errors().values()) {
                    if (apiError.error() != Errors.NOT_CONTROLLER) continue;
                    KafkaAdminClient.this.metadataManager.clearController();
                    KafkaAdminClient.this.metadataManager.requestUpdate();
                    throw apiError.exception();
                }
                for (Map.Entry entry : response.errors().entrySet()) {
                    future = (KafkaFutureImpl)topicFutures.get(entry.getKey());
                    if (future == null) {
                        KafkaAdminClient.this.log.warn("Server response mentioned unknown topic {}", entry.getKey());
                        continue;
                    }
                    ApiException exception = ((ApiError)entry.getValue()).exception();
                    if (exception != null) {
                        future.completeExceptionally(exception);
                        continue;
                    }
                    future.complete(null);
                }
                for (Map.Entry entry : topicFutures.entrySet()) {
                    future = (KafkaFutureImpl)entry.getValue();
                    if (future.isDone()) continue;
                    future.completeExceptionally(new ApiException("The server response did not contain a reference to node " + (String)entry.getKey()));
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(topicFutures.values(), throwable);
            }
        };
        if (!topicsMap.isEmpty()) {
            this.runnable.call(call, now);
        }
        return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
    }

    @Override
    public DeleteTopicsResult deleteTopics(Collection<String> topicNames, DeleteTopicsOptions options) {
        final HashMap topicFutures = new HashMap(topicNames.size());
        final ArrayList<String> validTopicNames = new ArrayList<String>(topicNames.size());
        for (String topicName : topicNames) {
            if (KafkaAdminClient.topicNameIsUnrepresentable(topicName)) {
                KafkaFutureImpl future = new KafkaFutureImpl();
                future.completeExceptionally(new InvalidTopicException("The given topic name '" + topicName + "' cannot be represented in a request."));
                topicFutures.put(topicName, future);
                continue;
            }
            if (topicFutures.containsKey(topicName)) continue;
            topicFutures.put(topicName, new KafkaFutureImpl());
            validTopicNames.add(topicName);
        }
        long now = this.time.milliseconds();
        Call call = new Call("deleteTopics", this.calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new DeleteTopicsRequest.Builder(new HashSet<String>(validTopicNames), timeoutMs);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                KafkaFutureImpl future;
                DeleteTopicsResponse response = (DeleteTopicsResponse)abstractResponse;
                for (Errors errors : response.errors().values()) {
                    if (errors != Errors.NOT_CONTROLLER) continue;
                    KafkaAdminClient.this.metadataManager.clearController();
                    KafkaAdminClient.this.metadataManager.requestUpdate();
                    throw errors.exception();
                }
                for (Map.Entry entry : response.errors().entrySet()) {
                    future = (KafkaFutureImpl)topicFutures.get(entry.getKey());
                    if (future == null) {
                        KafkaAdminClient.this.log.warn("Server response mentioned unknown topic {}", entry.getKey());
                        continue;
                    }
                    ApiException exception = ((Errors)((Object)entry.getValue())).exception();
                    if (exception != null) {
                        future.completeExceptionally(exception);
                        continue;
                    }
                    future.complete(null);
                }
                for (Map.Entry entry : topicFutures.entrySet()) {
                    future = (KafkaFutureImpl)entry.getValue();
                    if (future.isDone()) continue;
                    future.completeExceptionally(new ApiException("The server response did not contain a reference to node " + (String)entry.getKey()));
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(topicFutures.values(), throwable);
            }
        };
        if (!validTopicNames.isEmpty()) {
            this.runnable.call(call, now);
        }
        return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
    }

    @Override
    public ListTopicsResult listTopics(final ListTopicsOptions options) {
        final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<Map<String, TopicListing>>();
        long now = this.time.milliseconds();
        this.runnable.call(new Call("listTopics", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return MetadataRequest.Builder.allTopics();
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                MetadataResponse response = (MetadataResponse)abstractResponse;
                Cluster cluster = response.cluster();
                HashMap<String, TopicListing> topicListing = new HashMap<String, TopicListing>();
                for (String topicName : cluster.topics()) {
                    boolean internal = cluster.internalTopics().contains(topicName);
                    if (internal && !options.shouldListInternal()) continue;
                    topicListing.put(topicName, new TopicListing(topicName, internal));
                }
                topicListingFuture.complete(topicListing);
            }

            @Override
            void handleFailure(Throwable throwable) {
                topicListingFuture.completeExceptionally(throwable);
            }
        }, now);
        return new ListTopicsResult(topicListingFuture);
    }

    @Override
    public DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options) {
        final HashMap topicFutures = new HashMap(topicNames.size());
        final ArrayList<String> topicNamesList = new ArrayList<String>();
        for (String topicName : topicNames) {
            if (KafkaAdminClient.topicNameIsUnrepresentable(topicName)) {
                KafkaFutureImpl future = new KafkaFutureImpl();
                future.completeExceptionally(new InvalidTopicException("The given topic name '" + topicName + "' cannot be represented in a request."));
                topicFutures.put(topicName, future);
                continue;
            }
            if (topicFutures.containsKey(topicName)) continue;
            topicFutures.put(topicName, new KafkaFutureImpl());
            topicNamesList.add(topicName);
        }
        long now = this.time.milliseconds();
        Call call = new Call("describeTopics", this.calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()){
            private boolean supportsDisablingTopicCreation;
            {
                super(callName, deadlineMs, nodeProvider);
                this.supportsDisablingTopicCreation = true;
            }

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                if (this.supportsDisablingTopicCreation) {
                    return new MetadataRequest.Builder(topicNamesList, false);
                }
                return MetadataRequest.Builder.allTopics();
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                MetadataResponse response = (MetadataResponse)abstractResponse;
                Cluster cluster = response.cluster();
                Map<String, Errors> errors = response.errors();
                for (Map.Entry entry : topicFutures.entrySet()) {
                    String topicName = (String)entry.getKey();
                    KafkaFutureImpl future = (KafkaFutureImpl)entry.getValue();
                    Errors topicError = errors.get(topicName);
                    if (topicError != null) {
                        future.completeExceptionally(topicError.exception());
                        continue;
                    }
                    if (!cluster.topics().contains(topicName)) {
                        future.completeExceptionally(new InvalidTopicException("Topic " + topicName + " not found."));
                        continue;
                    }
                    boolean isInternal = cluster.internalTopics().contains(topicName);
                    List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
                    ArrayList<TopicPartitionInfo> partitions = new ArrayList<TopicPartitionInfo>(partitionInfos.size());
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(partitionInfo.partition(), this.leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas()));
                        partitions.add(topicPartitionInfo);
                    }
                    Collections.sort(partitions, new Comparator<TopicPartitionInfo>(){

                        @Override
                        public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2) {
                            return Integer.compare(tp1.partition(), tp2.partition());
                        }
                    });
                    TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
                    future.complete(topicDescription);
                }
            }

            private Node leader(PartitionInfo partitionInfo) {
                if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id()) {
                    return null;
                }
                return partitionInfo.leader();
            }

            @Override
            boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
                if (this.supportsDisablingTopicCreation) {
                    this.supportsDisablingTopicCreation = false;
                    return true;
                }
                return false;
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(topicFutures.values(), throwable);
            }
        };
        if (!topicNamesList.isEmpty()) {
            this.runnable.call(call, now);
        }
        return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
    }

    @Override
    public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
        final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<Collection<Node>>();
        final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<Node>();
        final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<String>();
        long now = this.time.milliseconds();
        this.runnable.call(new Call("listNodes", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new MetadataRequest.Builder(Collections.emptyList(), true);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                MetadataResponse response = (MetadataResponse)abstractResponse;
                describeClusterFuture.complete(response.brokers());
                controllerFuture.complete(this.controller(response));
                clusterIdFuture.complete(response.clusterId());
            }

            private Node controller(MetadataResponse response) {
                if (response.controller() == null || response.controller().id() == -1) {
                    return null;
                }
                return response.controller();
            }

            @Override
            void handleFailure(Throwable throwable) {
                describeClusterFuture.completeExceptionally(throwable);
                controllerFuture.completeExceptionally(throwable);
                clusterIdFuture.completeExceptionally(throwable);
            }
        }, now);
        return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture);
    }

    @Override
    public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) {
        long now = this.time.milliseconds();
        final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<Collection<AclBinding>>();
        this.runnable.call(new Call("describeAcls", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new DescribeAclsRequest.Builder(filter);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                DescribeAclsResponse response = (DescribeAclsResponse)abstractResponse;
                if (response.error().isFailure()) {
                    future.completeExceptionally(response.error().exception());
                } else {
                    future.complete(response.acls());
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                future.completeExceptionally(throwable);
            }
        }, now);
        return new DescribeAclsResult(future);
    }

    @Override
    public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
        long now = this.time.milliseconds();
        final HashMap futures = new HashMap();
        final ArrayList<CreateAclsRequest.AclCreation> aclCreations = new ArrayList<CreateAclsRequest.AclCreation>();
        for (AclBinding acl : acls) {
            if (futures.get(acl) != null) continue;
            KafkaFutureImpl future = new KafkaFutureImpl();
            futures.put(acl, future);
            String indefinite = acl.toFilter().findIndefiniteField();
            if (indefinite == null) {
                aclCreations.add(new CreateAclsRequest.AclCreation(acl));
                continue;
            }
            future.completeExceptionally(new InvalidRequestException("Invalid ACL creation: " + indefinite));
        }
        this.runnable.call(new Call("createAcls", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new CreateAclsRequest.Builder(aclCreations);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                CreateAclsResponse response = (CreateAclsResponse)abstractResponse;
                List<CreateAclsResponse.AclCreationResponse> responses = response.aclCreationResponses();
                Iterator<CreateAclsResponse.AclCreationResponse> iter = responses.iterator();
                for (CreateAclsRequest.AclCreation aclCreation : aclCreations) {
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(aclCreation.acl());
                    if (!iter.hasNext()) {
                        future.completeExceptionally(new UnknownServerException("The broker reported no creation result for the given ACL."));
                        continue;
                    }
                    CreateAclsResponse.AclCreationResponse creation = iter.next();
                    if (creation.error().isFailure()) {
                        future.completeExceptionally(creation.error().exception());
                        continue;
                    }
                    future.complete(null);
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
            }
        }, now);
        return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
    }

    @Override
    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
        long now = this.time.milliseconds();
        final HashMap futures = new HashMap();
        final ArrayList<AclBindingFilter> filterList = new ArrayList<AclBindingFilter>();
        for (AclBindingFilter filter : filters) {
            if (futures.get(filter) != null) continue;
            filterList.add(filter);
            futures.put(filter, new KafkaFutureImpl());
        }
        this.runnable.call(new Call("deleteAcls", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new DeleteAclsRequest.Builder(filterList);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                DeleteAclsResponse response = (DeleteAclsResponse)abstractResponse;
                List<DeleteAclsResponse.AclFilterResponse> responses = response.responses();
                Iterator<DeleteAclsResponse.AclFilterResponse> iter = responses.iterator();
                for (AclBindingFilter filter : filterList) {
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(filter);
                    if (!iter.hasNext()) {
                        future.completeExceptionally(new UnknownServerException("The broker reported no deletion result for the given filter."));
                        continue;
                    }
                    DeleteAclsResponse.AclFilterResponse deletion = iter.next();
                    if (deletion.error().isFailure()) {
                        future.completeExceptionally(deletion.error().exception());
                        continue;
                    }
                    ArrayList<DeleteAclsResult.FilterResult> filterResults = new ArrayList<DeleteAclsResult.FilterResult>();
                    for (DeleteAclsResponse.AclDeletionResult deletionResult : deletion.deletions()) {
                        filterResults.add(new DeleteAclsResult.FilterResult(deletionResult.acl(), deletionResult.error().exception()));
                    }
                    future.complete(new DeleteAclsResult.FilterResults(filterResults));
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
            }
        }, now);
        return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<DeleteAclsResult.FilterResults>>(futures));
    }

    @Override
    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
        final HashMap unifiedRequestFutures = new HashMap();
        HashMap brokerFutures = new HashMap(configResources.size());
        ArrayList<ConfigResource> brokerResources = new ArrayList<ConfigResource>();
        final ArrayList<ConfigResource> unifiedRequestResources = new ArrayList<ConfigResource>(configResources.size());
        for (ConfigResource resource : configResources) {
            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
                brokerFutures.put(resource, new KafkaFutureImpl());
                brokerResources.add(resource);
                continue;
            }
            unifiedRequestFutures.put(resource, new KafkaFutureImpl());
            unifiedRequestResources.add(resource);
        }
        long now = this.time.milliseconds();
        if (!unifiedRequestResources.isEmpty()) {
            this.runnable.call(new Call("describeConfigs", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

                @Override
                AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new DescribeConfigsRequest.Builder(unifiedRequestResources).includeSynonyms(options.includeSynonyms());
                }

                @Override
                void handleResponse(AbstractResponse abstractResponse) {
                    DescribeConfigsResponse response = (DescribeConfigsResponse)abstractResponse;
                    for (Map.Entry entry : unifiedRequestFutures.entrySet()) {
                        ConfigResource configResource = (ConfigResource)entry.getKey();
                        KafkaFutureImpl future = (KafkaFutureImpl)entry.getValue();
                        DescribeConfigsResponse.Config config = response.config(configResource);
                        if (config == null) {
                            future.completeExceptionally(new UnknownServerException("Malformed broker response: missing config for " + configResource));
                            continue;
                        }
                        if (config.error().isFailure()) {
                            future.completeExceptionally(config.error().exception());
                            continue;
                        }
                        ArrayList<ConfigEntry> configEntries = new ArrayList<ConfigEntry>();
                        for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
                            configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(), KafkaAdminClient.this.configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(), KafkaAdminClient.this.configSynonyms(configEntry)));
                        }
                        future.complete(new Config(configEntries));
                    }
                }

                @Override
                void handleFailure(Throwable throwable) {
                    KafkaAdminClient.completeAllExceptionally(unifiedRequestFutures.values(), throwable);
                }
            }, now);
        }
        for (Map.Entry entry : brokerFutures.entrySet()) {
            final KafkaFutureImpl brokerFuture = (KafkaFutureImpl)entry.getValue();
            final ConfigResource resource = (ConfigResource)entry.getKey();
            int nodeId = Integer.parseInt(resource.name());
            this.runnable.call(new Call("describeBrokerConfigs", this.calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(nodeId)){

                @Override
                AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new DescribeConfigsRequest.Builder(Collections.singleton(resource)).includeSynonyms(options.includeSynonyms());
                }

                @Override
                void handleResponse(AbstractResponse abstractResponse) {
                    DescribeConfigsResponse response = (DescribeConfigsResponse)abstractResponse;
                    DescribeConfigsResponse.Config config = response.configs().get(resource);
                    if (config == null) {
                        brokerFuture.completeExceptionally(new UnknownServerException("Malformed broker response: missing config for " + resource));
                        return;
                    }
                    if (config.error().isFailure()) {
                        brokerFuture.completeExceptionally(config.error().exception());
                    } else {
                        ArrayList<ConfigEntry> configEntries = new ArrayList<ConfigEntry>();
                        for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
                            configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(), KafkaAdminClient.this.configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(), KafkaAdminClient.this.configSynonyms(configEntry)));
                        }
                        brokerFuture.complete(new Config(configEntries));
                    }
                }

                @Override
                void handleFailure(Throwable throwable) {
                    brokerFuture.completeExceptionally(throwable);
                }
            }, now);
        }
        HashMap<ConfigResource, KafkaFuture<Config>> allFutures = new HashMap<ConfigResource, KafkaFuture<Config>>();
        allFutures.putAll(brokerFutures);
        allFutures.putAll(unifiedRequestFutures);
        return new DescribeConfigsResult(allFutures);
    }

    private List<ConfigEntry.ConfigSynonym> configSynonyms(DescribeConfigsResponse.ConfigEntry configEntry) {
        ArrayList<ConfigEntry.ConfigSynonym> synonyms = new ArrayList<ConfigEntry.ConfigSynonym>(configEntry.synonyms().size());
        for (DescribeConfigsResponse.ConfigSynonym synonym : configEntry.synonyms()) {
            synonyms.add(new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), this.configSource(synonym.source())));
        }
        return synonyms;
    }

    private ConfigEntry.ConfigSource configSource(DescribeConfigsResponse.ConfigSource source) {
        ConfigEntry.ConfigSource configSource;
        switch (source) {
            case TOPIC_CONFIG: {
                configSource = ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;
                break;
            }
            case DYNAMIC_BROKER_CONFIG: {
                configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
                break;
            }
            case DYNAMIC_DEFAULT_BROKER_CONFIG: {
                configSource = ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
                break;
            }
            case STATIC_BROKER_CONFIG: {
                configSource = ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG;
                break;
            }
            case DEFAULT_CONFIG: {
                configSource = ConfigEntry.ConfigSource.DEFAULT_CONFIG;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unexpected config source " + (Object)((Object)source));
            }
        }
        return configSource;
    }

    @Override
    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
        HashMap<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<ConfigResource, KafkaFutureImpl<Void>>();
        ArrayList<ConfigResource> unifiedRequestResources = new ArrayList<ConfigResource>();
        for (ConfigResource resource : configs.keySet()) {
            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
                ConstantNodeIdProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
                allFutures.putAll(this.alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
                continue;
            }
            unifiedRequestResources.add(resource);
        }
        if (!unifiedRequestResources.isEmpty()) {
            allFutures.putAll(this.alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
        }
        return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(allFutures));
    }

    private Map<ConfigResource, KafkaFutureImpl<Void>> alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options, Collection<ConfigResource> resources, NodeProvider nodeProvider) {
        final HashMap<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<ConfigResource, KafkaFutureImpl<Void>>();
        final HashMap<ConfigResource, AlterConfigsRequest.Config> requestMap = new HashMap<ConfigResource, AlterConfigsRequest.Config>(resources.size());
        for (ConfigResource resource : resources) {
            ArrayList<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<AlterConfigsRequest.ConfigEntry>();
            for (ConfigEntry configEntry : configs.get(resource).entries()) {
                configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
            }
            requestMap.put(resource, new AlterConfigsRequest.Config(configEntries));
            futures.put(resource, new KafkaFutureImpl());
        }
        long now = this.time.milliseconds();
        this.runnable.call(new Call("alterConfigs", this.calcDeadlineMs(now, options.timeoutMs()), nodeProvider){

            @Override
            public AbstractRequest.Builder createRequest(int timeoutMs) {
                return new AlterConfigsRequest.Builder(requestMap, options.shouldValidateOnly());
            }

            @Override
            public void handleResponse(AbstractResponse abstractResponse) {
                AlterConfigsResponse response = (AlterConfigsResponse)abstractResponse;
                for (Map.Entry entry : futures.entrySet()) {
                    KafkaFutureImpl future = (KafkaFutureImpl)entry.getValue();
                    ApiException exception = response.errors().get(entry.getKey()).exception();
                    if (exception != null) {
                        future.completeExceptionally(exception);
                        continue;
                    }
                    future.complete(null);
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
            }
        }, now);
        return futures;
    }

    @Override
    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options) {
        int brokerId;
        final HashMap futures = new HashMap(replicaAssignment.size());
        for (TopicPartitionReplica replica : replicaAssignment.keySet()) {
            futures.put(replica, new KafkaFutureImpl());
        }
        HashMap replicaAssignmentByBroker = new HashMap();
        for (Map.Entry<TopicPartitionReplica, String> entry : replicaAssignment.entrySet()) {
            TopicPartitionReplica replica = entry.getKey();
            String logDir = entry.getValue();
            brokerId = replica.brokerId();
            TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition());
            if (!replicaAssignmentByBroker.containsKey(brokerId)) {
                replicaAssignmentByBroker.put(brokerId, new HashMap());
            }
            ((Map)replicaAssignmentByBroker.get(brokerId)).put(topicPartition, logDir);
        }
        long now = this.time.milliseconds();
        for (Map.Entry entry : replicaAssignmentByBroker.entrySet()) {
            brokerId = (Integer)entry.getKey();
            final Map assignment = (Map)entry.getValue();
            this.runnable.call(new Call("alterReplicaLogDirs", this.calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)){

                @Override
                public AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new AlterReplicaLogDirsRequest.Builder(assignment);
                }

                @Override
                public void handleResponse(AbstractResponse abstractResponse) {
                    AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse)abstractResponse;
                    for (Map.Entry<TopicPartition, Errors> responseEntry : response.responses().entrySet()) {
                        TopicPartition tp = responseEntry.getKey();
                        Errors error = responseEntry.getValue();
                        TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
                        KafkaFutureImpl future = (KafkaFutureImpl)futures.get(replica);
                        if (future == null) {
                            this.handleFailure(new IllegalStateException("The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
                            continue;
                        }
                        if (error == Errors.NONE) {
                            future.complete(null);
                            continue;
                        }
                        future.completeExceptionally(error.exception());
                    }
                }

                @Override
                void handleFailure(Throwable throwable) {
                    KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
                }
            }, now);
        }
        return new AlterReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
    }

    @Override
    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
        final HashMap futures = new HashMap(brokers.size());
        for (Integer brokerId : brokers) {
            futures.put(brokerId, new KafkaFutureImpl());
        }
        long now = this.time.milliseconds();
        for (final Integer brokerId : brokers) {
            this.runnable.call(new Call("describeLogDirs", this.calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)){

                @Override
                public AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new DescribeLogDirsRequest.Builder(null);
                }

                @Override
                public void handleResponse(AbstractResponse abstractResponse) {
                    DescribeLogDirsResponse response = (DescribeLogDirsResponse)abstractResponse;
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(brokerId);
                    if (response.logDirInfos().size() > 0) {
                        future.complete(response.logDirInfos());
                    } else {
                        future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
                    }
                }

                @Override
                void handleFailure(Throwable throwable) {
                    KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
                }
            }, now);
        }
        return new DescribeLogDirsResult(new HashMap<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>>(futures));
    }

    @Override
    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) {
        final HashMap futures = new HashMap(replicas.size());
        for (TopicPartitionReplica replica : replicas) {
            futures.put(replica, new KafkaFutureImpl());
        }
        HashMap partitionsByBroker = new HashMap();
        for (TopicPartitionReplica replica : replicas) {
            if (!partitionsByBroker.containsKey(replica.brokerId())) {
                partitionsByBroker.put(replica.brokerId(), new HashSet());
            }
            ((Set)partitionsByBroker.get(replica.brokerId())).add(new TopicPartition(replica.topic(), replica.partition()));
        }
        long now = this.time.milliseconds();
        for (Map.Entry entry : partitionsByBroker.entrySet()) {
            final int brokerId = (Integer)entry.getKey();
            final Set topicPartitions = (Set)entry.getValue();
            final HashMap<TopicPartition, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<TopicPartition, DescribeReplicaLogDirsResult.ReplicaLogDirInfo>();
            for (TopicPartition topicPartition : topicPartitions) {
                replicaDirInfoByPartition.put(topicPartition, new DescribeReplicaLogDirsResult.ReplicaLogDirInfo());
            }
            this.runnable.call(new Call("describeReplicaLogDirs", this.calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)){

                @Override
                public AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new DescribeLogDirsRequest.Builder(topicPartitions);
                }

                @Override
                public void handleResponse(AbstractResponse abstractResponse) {
                    DescribeLogDirsResponse response = (DescribeLogDirsResponse)abstractResponse;
                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry : response.logDirInfos().entrySet()) {
                        String logDir = entry.getKey();
                        DescribeLogDirsResponse.LogDirInfo logDirInfo = entry.getValue();
                        if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR) continue;
                        if (logDirInfo.error != Errors.NONE) {
                            this.handleFailure(new IllegalStateException("The error " + (Object)((Object)logDirInfo.error) + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
                        }
                        for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos.entrySet()) {
                            TopicPartition tp = replicaInfoEntry.getKey();
                            DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
                            DescribeReplicaLogDirsResult.ReplicaLogDirInfo replicaLogDirInfo = (DescribeReplicaLogDirsResult.ReplicaLogDirInfo)replicaDirInfoByPartition.get(tp);
                            if (replicaLogDirInfo == null) {
                                this.handleFailure(new IllegalStateException("The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
                                continue;
                            }
                            if (replicaInfo.isFuture) {
                                replicaDirInfoByPartition.put(tp, new DescribeReplicaLogDirsResult.ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), replicaLogDirInfo.getCurrentReplicaOffsetLag(), logDir, replicaInfo.offsetLag));
                                continue;
                            }
                            replicaDirInfoByPartition.put(tp, new DescribeReplicaLogDirsResult.ReplicaLogDirInfo(logDir, replicaInfo.offsetLag, replicaLogDirInfo.getFutureReplicaLogDir(), replicaLogDirInfo.getFutureReplicaOffsetLag()));
                        }
                    }
                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry : replicaDirInfoByPartition.entrySet()) {
                        TopicPartition tp = (TopicPartition)((Object)entry.getKey());
                        KafkaFutureImpl future = (KafkaFutureImpl)futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
                        future.complete(entry.getValue());
                    }
                }

                @Override
                void handleFailure(Throwable throwable) {
                    KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
                }
            }, now);
        }
        return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>>(futures));
    }

    @Override
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options) {
        final HashMap futures = new HashMap(newPartitions.size());
        for (String topic : newPartitions.keySet()) {
            futures.put(topic, new KafkaFutureImpl());
        }
        final HashMap<String, NewPartitions> requestMap = new HashMap<String, NewPartitions>(newPartitions);
        long now = this.time.milliseconds();
        this.runnable.call(new Call("createPartitions", this.calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()){

            @Override
            public AbstractRequest.Builder createRequest(int timeoutMs) {
                return new CreatePartitionsRequest.Builder(requestMap, timeoutMs, options.validateOnly());
            }

            @Override
            public void handleResponse(AbstractResponse abstractResponse) {
                CreatePartitionsResponse response = (CreatePartitionsResponse)abstractResponse;
                for (ApiError apiError : response.errors().values()) {
                    if (apiError.error() != Errors.NOT_CONTROLLER) continue;
                    KafkaAdminClient.this.metadataManager.clearController();
                    KafkaAdminClient.this.metadataManager.requestUpdate();
                    throw apiError.exception();
                }
                for (Map.Entry entry : response.errors().entrySet()) {
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(entry.getKey());
                    if (((ApiError)entry.getValue()).isSuccess()) {
                        future.complete(null);
                        continue;
                    }
                    future.completeExceptionally(((ApiError)entry.getValue()).exception());
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
            }
        }, now);
        return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures));
    }

    @Override
    public DeleteRecordsResult deleteRecords(final Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) {
        final HashMap futures = new HashMap(recordsToDelete.size());
        for (TopicPartition topicPartition : recordsToDelete.keySet()) {
            futures.put(topicPartition, new KafkaFutureImpl());
        }
        final HashSet<String> topics = new HashSet<String>();
        for (TopicPartition topicPartition : recordsToDelete.keySet()) {
            topics.add(topicPartition.topic());
        }
        long nowMetadata = this.time.milliseconds();
        final long deadline = this.calcDeadlineMs(nowMetadata, options.timeoutMs());
        this.runnable.call(new Call("topicsMetadata", deadline, new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new MetadataRequest.Builder(new ArrayList<String>(topics), false);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                MetadataResponse response = (MetadataResponse)abstractResponse;
                Map<String, Errors> errors = response.errors();
                Cluster cluster = response.cluster();
                for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
                    for (Map.Entry future : futures.entrySet()) {
                        if (!((TopicPartition)future.getKey()).topic().equals(topicError.getKey())) continue;
                        ((KafkaFutureImpl)future.getValue()).completeExceptionally(topicError.getValue().exception());
                    }
                }
                HashMap leaders = new HashMap();
                for (final Map.Entry entry : recordsToDelete.entrySet()) {
                    if (errors.containsKey(((TopicPartition)entry.getKey()).topic())) continue;
                    Node node = cluster.leaderFor((TopicPartition)entry.getKey());
                    if (node != null) {
                        if (!leaders.containsKey(node)) {
                            leaders.put(node, new HashMap());
                        }
                        ((Map)leaders.get(node)).put(entry.getKey(), ((RecordsToDelete)entry.getValue()).beforeOffset());
                        continue;
                    }
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(entry.getKey());
                    future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
                }
                for (final Map.Entry entry : leaders.entrySet()) {
                    long nowDelete = KafkaAdminClient.this.time.milliseconds();
                    int brokerId = ((Node)entry.getKey()).id();
                    KafkaAdminClient.this.runnable.call(new Call("deleteRecords", deadline, new ConstantNodeIdProvider(brokerId)){

                        @Override
                        AbstractRequest.Builder createRequest(int timeoutMs) {
                            return new DeleteRecordsRequest.Builder(timeoutMs, (Map)entry.getValue());
                        }

                        @Override
                        void handleResponse(AbstractResponse abstractResponse) {
                            DeleteRecordsResponse response = (DeleteRecordsResponse)abstractResponse;
                            for (Map.Entry<TopicPartition, DeleteRecordsResponse.PartitionResponse> result : response.responses().entrySet()) {
                                KafkaFutureImpl future = (KafkaFutureImpl)futures.get(result.getKey());
                                if (result.getValue().error == Errors.NONE) {
                                    future.complete(new DeletedRecords(result.getValue().lowWatermark));
                                    continue;
                                }
                                future.completeExceptionally(result.getValue().error.exception());
                            }
                        }

                        @Override
                        void handleFailure(Throwable throwable) {
                            KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
                        }
                    }, nowDelete);
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaAdminClient.completeAllExceptionally(futures.values(), throwable);
            }
        }, nowMetadata);
        return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
    }

    @Override
    public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
        final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<DelegationToken>();
        long now = this.time.milliseconds();
        this.runnable.call(new Call("createDelegationToken", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                CreateDelegationTokenResponse response = (CreateDelegationTokenResponse)abstractResponse;
                if (response.hasError()) {
                    delegationTokenFuture.completeExceptionally(response.error().exception());
                } else {
                    TokenInformation tokenInfo = new TokenInformation(response.tokenId(), response.owner(), options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
                    DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
                    delegationTokenFuture.complete(token);
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                delegationTokenFuture.completeExceptionally(throwable);
            }
        }, now);
        return new CreateDelegationTokenResult(delegationTokenFuture);
    }

    @Override
    public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) {
        final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<Long>();
        long now = this.time.milliseconds();
        this.runnable.call(new Call("renewDelegationToken", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs());
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                RenewDelegationTokenResponse response = (RenewDelegationTokenResponse)abstractResponse;
                if (response.hasError()) {
                    expiryTimeFuture.completeExceptionally(response.error().exception());
                } else {
                    expiryTimeFuture.complete(response.expiryTimestamp());
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                expiryTimeFuture.completeExceptionally(throwable);
            }
        }, now);
        return new RenewDelegationTokenResult(expiryTimeFuture);
    }

    @Override
    public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) {
        final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<Long>();
        long now = this.time.milliseconds();
        this.runnable.call(new Call("expireDelegationToken", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                ExpireDelegationTokenResponse response = (ExpireDelegationTokenResponse)abstractResponse;
                if (response.hasError()) {
                    expiryTimeFuture.completeExceptionally(response.error().exception());
                } else {
                    expiryTimeFuture.complete(response.expiryTimestamp());
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                expiryTimeFuture.completeExceptionally(throwable);
            }
        }, now);
        return new ExpireDelegationTokenResult(expiryTimeFuture);
    }

    @Override
    public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) {
        final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new KafkaFutureImpl<List<DelegationToken>>();
        long now = this.time.milliseconds();
        this.runnable.call(new Call("describeDelegationToken", this.calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new DescribeDelegationTokenRequest.Builder(options.owners());
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                DescribeDelegationTokenResponse response = (DescribeDelegationTokenResponse)abstractResponse;
                if (response.hasError()) {
                    tokensFuture.completeExceptionally(response.error().exception());
                } else {
                    tokensFuture.complete(response.tokens());
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                tokensFuture.completeExceptionally(throwable);
            }
        }, now);
        return new DescribeDelegationTokenResult(tokensFuture);
    }

    @Override
    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options) {
        final HashMap futures = new HashMap(groupIds.size());
        for (String string : groupIds) {
            if (KafkaAdminClient.groupIdIsUnrepresentable(string)) {
                KafkaFutureImpl future = new KafkaFutureImpl();
                future.completeExceptionally(new InvalidGroupIdException("The given group id '" + string + "' cannot be represented in a request."));
                futures.put(string, future);
                continue;
            }
            if (futures.containsKey(string)) continue;
            futures.put(string, new KafkaFutureImpl());
        }
        for (Map.Entry entry : futures.entrySet()) {
            if (((KafkaFutureImpl)entry.getValue()).isCompletedExceptionally()) continue;
            final String groupId = (String)entry.getKey();
            long startFindCoordinatorMs = this.time.milliseconds();
            final long deadline = this.calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
            this.runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()){

                @Override
                AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
                }

                @Override
                void handleResponse(AbstractResponse abstractResponse) {
                    final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse)abstractResponse;
                    if (KafkaAdminClient.this.handleFindCoordinatorError(fcResponse, (KafkaFutureImpl)futures.get(groupId))) {
                        return;
                    }
                    long nowDescribeConsumerGroups = KafkaAdminClient.this.time.milliseconds();
                    int nodeId = fcResponse.node().id();
                    KafkaAdminClient.this.runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)){

                        @Override
                        AbstractRequest.Builder createRequest(int timeoutMs) {
                            return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
                        }

                        @Override
                        void handleResponse(AbstractResponse abstractResponse) {
                            DescribeGroupsResponse response = (DescribeGroupsResponse)abstractResponse;
                            KafkaFutureImpl future = (KafkaFutureImpl)futures.get(groupId);
                            DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
                            Errors groupError = groupMetadata.error();
                            if (groupError != Errors.NONE) {
                                future.completeExceptionally(groupError.exception());
                            } else {
                                String protocolType = groupMetadata.protocolType();
                                if (protocolType.equals("consumer") || protocolType.isEmpty()) {
                                    List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
                                    ArrayList<MemberDescription> memberDescriptions = new ArrayList<MemberDescription>(members.size());
                                    for (DescribeGroupsResponse.GroupMember groupMember : members) {
                                        Set<TopicPartition> partitions = Collections.emptySet();
                                        if (groupMember.memberAssignment().remaining() > 0) {
                                            PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(groupMember.memberAssignment().duplicate());
                                            partitions = new HashSet<TopicPartition>(assignment.partitions());
                                        }
                                        MemberDescription memberDescription = new MemberDescription(groupMember.memberId(), groupMember.clientId(), groupMember.clientHost(), new MemberAssignment(partitions));
                                        memberDescriptions.add(memberDescription);
                                    }
                                    ConsumerGroupDescription consumerGroupDescription = new ConsumerGroupDescription(groupId, protocolType.isEmpty(), memberDescriptions, groupMetadata.protocol(), ConsumerGroupState.parse(groupMetadata.state()), fcResponse.node());
                                    future.complete(consumerGroupDescription);
                                }
                            }
                        }

                        @Override
                        void handleFailure(Throwable throwable) {
                            KafkaFutureImpl future = (KafkaFutureImpl)futures.get(groupId);
                            future.completeExceptionally(throwable);
                        }
                    }, nowDescribeConsumerGroups);
                }

                @Override
                void handleFailure(Throwable throwable) {
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(groupId);
                    future.completeExceptionally(throwable);
                }
            }, startFindCoordinatorMs);
        }
        return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
    }

    private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
        Errors error = response.error();
        if (error.exception() instanceof RetriableException) {
            throw error.exception();
        }
        if (response.hasError()) {
            future.completeExceptionally(error.exception());
            return true;
        }
        return false;
    }

    @Override
    public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
        final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<Collection<Object>>();
        long nowMetadata = this.time.milliseconds();
        final long deadline = this.calcDeadlineMs(nowMetadata, options.timeoutMs());
        this.runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new MetadataRequest.Builder(Collections.emptyList(), true);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                MetadataResponse metadataResponse = (MetadataResponse)abstractResponse;
                Cluster cluster = metadataResponse.cluster();
                if (cluster.nodes().isEmpty()) {
                    throw new StaleMetadataException("Metadata fetch failed due to missing broker list");
                }
                HashSet<Node> allNodes = new HashSet<Node>(cluster.nodes());
                final ListConsumerGroupsResults results = new ListConsumerGroupsResults(allNodes, all);
                for (final Node node : allNodes) {
                    long nowList = KafkaAdminClient.this.time.milliseconds();
                    KafkaAdminClient.this.runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())){

                        @Override
                        AbstractRequest.Builder createRequest(int timeoutMs) {
                            return new ListGroupsRequest.Builder();
                        }

                        private void maybeAddConsumerGroup(ListGroupsResponse.Group group) {
                            String protocolType = group.protocolType();
                            if (protocolType.equals("consumer") || protocolType.isEmpty()) {
                                String groupId = group.groupId();
                                ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
                                results.addListing(groupListing);
                            }
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        void handleResponse(AbstractResponse abstractResponse) {
                            ListGroupsResponse response = (ListGroupsResponse)abstractResponse;
                            ListConsumerGroupsResults listConsumerGroupsResults = results;
                            synchronized (listConsumerGroupsResults) {
                                Errors error = response.error();
                                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                                    throw error.exception();
                                }
                                if (error != Errors.NONE) {
                                    results.addError(error.exception(), node);
                                } else {
                                    for (ListGroupsResponse.Group group : response.groups()) {
                                        this.maybeAddConsumerGroup(group);
                                    }
                                }
                                results.tryComplete(node);
                            }
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        void handleFailure(Throwable throwable) {
                            ListConsumerGroupsResults listConsumerGroupsResults = results;
                            synchronized (listConsumerGroupsResults) {
                                results.addError(throwable, node);
                                results.tryComplete(node);
                            }
                        }
                    }, nowList);
                }
            }

            @Override
            void handleFailure(Throwable throwable) {
                KafkaException exception = new KafkaException("Failed to find brokers to send ListGroups", throwable);
                all.complete(Collections.singletonList(exception));
            }
        }, nowMetadata);
        return new ListConsumerGroupsResult(all);
    }

    @Override
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
        final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>>();
        long startFindCoordinatorMs = this.time.milliseconds();
        final long deadline = this.calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
        this.runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()){

            @Override
            AbstractRequest.Builder createRequest(int timeoutMs) {
                return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
            }

            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                FindCoordinatorResponse response = (FindCoordinatorResponse)abstractResponse;
                if (KafkaAdminClient.this.handleFindCoordinatorError(response, groupOffsetListingFuture)) {
                    return;
                }
                long nowListConsumerGroupOffsets = KafkaAdminClient.this.time.milliseconds();
                int nodeId = response.node().id();
                KafkaAdminClient.this.runnable.call(new Call("listConsumerGroupOffsets", deadline, (NodeProvider)new ConstantNodeIdProvider(nodeId)){

                    @Override
                    AbstractRequest.Builder createRequest(int timeoutMs) {
                        return new OffsetFetchRequest.Builder(groupId, options.topicPartitions());
                    }

                    @Override
                    void handleResponse(AbstractResponse abstractResponse) {
                        OffsetFetchResponse response = (OffsetFetchResponse)abstractResponse;
                        HashMap<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<TopicPartition, OffsetAndMetadata>();
                        if (response.hasError()) {
                            groupOffsetListingFuture.completeExceptionally(response.error().exception());
                        } else {
                            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition topicPartition = entry.getKey();
                                Errors error = entry.getValue().error;
                                if (error == Errors.NONE) {
                                    Long offset = entry.getValue().offset;
                                    String metadata = entry.getValue().metadata;
                                    groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata));
                                    continue;
                                }
                                KafkaAdminClient.this.log.warn("Skipping return offset for {} due to error {}.", (Object)topicPartition, (Object)error);
                            }
                            groupOffsetListingFuture.complete(groupOffsetsListing);
                        }
                    }

                    @Override
                    void handleFailure(Throwable throwable) {
                        groupOffsetListingFuture.completeExceptionally(throwable);
                    }
                }, nowListConsumerGroupOffsets);
            }

            @Override
            void handleFailure(Throwable throwable) {
                groupOffsetListingFuture.completeExceptionally(throwable);
            }
        }, startFindCoordinatorMs);
        return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
    }

    @Override
    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
        final HashMap futures = new HashMap(groupIds.size());
        for (final String groupId : groupIds) {
            if (KafkaAdminClient.groupIdIsUnrepresentable(groupId)) {
                KafkaFutureImpl future = new KafkaFutureImpl();
                future.completeExceptionally(new ApiException("The given group id '" + groupId + "' cannot be represented in a request."));
                futures.put(groupId, future);
                continue;
            }
            if (futures.containsKey(groupId)) continue;
            futures.put(groupId, new KafkaFutureImpl());
        }
        for (final String groupId : groupIds) {
            if (((KafkaFutureImpl)futures.get(groupId)).isCompletedExceptionally()) continue;
            long startFindCoordinatorMs = this.time.milliseconds();
            final long deadline = this.calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
            this.runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()){

                @Override
                AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
                }

                @Override
                void handleResponse(AbstractResponse abstractResponse) {
                    FindCoordinatorResponse response = (FindCoordinatorResponse)abstractResponse;
                    if (KafkaAdminClient.this.handleFindCoordinatorError(response, (KafkaFutureImpl)futures.get(groupId))) {
                        return;
                    }
                    long nowDeleteConsumerGroups = KafkaAdminClient.this.time.milliseconds();
                    int nodeId = response.node().id();
                    KafkaAdminClient.this.runnable.call(new Call("deleteConsumerGroups", deadline, (NodeProvider)new ConstantNodeIdProvider(nodeId)){

                        @Override
                        AbstractRequest.Builder createRequest(int timeoutMs) {
                            return new DeleteGroupsRequest.Builder(Collections.singleton(groupId));
                        }

                        @Override
                        void handleResponse(AbstractResponse abstractResponse) {
                            DeleteGroupsResponse response = (DeleteGroupsResponse)abstractResponse;
                            KafkaFutureImpl future = (KafkaFutureImpl)futures.get(groupId);
                            Errors groupError = response.get(groupId);
                            if (groupError != Errors.NONE) {
                                future.completeExceptionally(groupError.exception());
                            } else {
                                future.complete(null);
                            }
                        }

                        @Override
                        void handleFailure(Throwable throwable) {
                            KafkaFutureImpl future = (KafkaFutureImpl)futures.get(groupId);
                            future.completeExceptionally(throwable);
                        }
                    }, nowDeleteConsumerGroups);
                }

                @Override
                void handleFailure(Throwable throwable) {
                    KafkaFutureImpl future = (KafkaFutureImpl)futures.get(groupId);
                    future.completeExceptionally(throwable);
                }
            }, startFindCoordinatorMs);
        }
        return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures));
    }

    private static final class ListConsumerGroupsResults {
        private final List<Throwable> errors = new ArrayList<Throwable>();
        private final HashMap<String, ConsumerGroupListing> listings = new HashMap();
        private final HashSet<Node> remaining;
        private final KafkaFutureImpl<Collection<Object>> future;

        ListConsumerGroupsResults(Collection<Node> leaders, KafkaFutureImpl<Collection<Object>> future) {
            this.remaining = new HashSet<Node>(leaders);
            this.future = future;
            this.tryComplete();
        }

        synchronized void addError(Throwable throwable, Node node) {
            ApiError error = ApiError.fromThrowable(throwable);
            if (error.message() == null || error.message().isEmpty()) {
                this.errors.add(error.error().exception("Error listing groups on " + node));
            } else {
                this.errors.add(error.error().exception("Error listing groups on " + node + ": " + error.message()));
            }
        }

        synchronized void addListing(ConsumerGroupListing listing) {
            this.listings.put(listing.groupId(), listing);
        }

        synchronized void tryComplete(Node leader) {
            this.remaining.remove(leader);
            this.tryComplete();
        }

        private synchronized void tryComplete() {
            if (this.remaining.isEmpty()) {
                ArrayList<ConsumerGroupListing> results = new ArrayList<ConsumerGroupListing>(this.listings.values());
                results.addAll(this.errors);
                this.future.complete(results);
            }
        }
    }

    private final class AdminClientRunnable
    implements Runnable {
        private final ArrayList<Call> pendingCalls = new ArrayList();
        private final Map<Node, List<Call>> callsToSend = new HashMap<Node, List<Call>>();
        private final Map<String, List<Call>> callsInFlight = new HashMap<String, List<Call>>();
        private final Map<Integer, Call> correlationIdToCalls = new HashMap<Integer, Call>();
        private List<Call> newCalls = new LinkedList<Call>();

        private AdminClientRunnable() {
        }

        private void timeoutPendingCalls(TimeoutProcessor processor) {
            int numTimedOut = processor.handleTimeouts(this.pendingCalls, "Timed out waiting for a node assignment.");
            if (numTimedOut > 0) {
                KafkaAdminClient.this.log.debug("Timed out {} pending calls.", (Object)numTimedOut);
            }
        }

        private int timeoutCallsToSend(TimeoutProcessor processor) {
            int numTimedOut = 0;
            for (List<Call> callList : this.callsToSend.values()) {
                numTimedOut += processor.handleTimeouts(callList, "Timed out waiting to send the call.");
            }
            if (numTimedOut > 0) {
                KafkaAdminClient.this.log.debug("Timed out {} call(s) with assigned nodes.", (Object)numTimedOut);
            }
            return numTimedOut;
        }

        private synchronized void drainNewCalls() {
            if (!this.newCalls.isEmpty()) {
                this.pendingCalls.addAll(this.newCalls);
                this.newCalls.clear();
            }
        }

        private long maybeDrainPendingCalls(long now) {
            long pollTimeout = Long.MAX_VALUE;
            KafkaAdminClient.this.log.trace("Trying to choose nodes for {} at {}", this.pendingCalls, (Object)now);
            Iterator<Call> pendingIter = this.pendingCalls.iterator();
            while (pendingIter.hasNext()) {
                Call call = pendingIter.next();
                if (now < call.nextAllowedTryMs) {
                    pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs - now);
                    continue;
                }
                if (!this.maybeDrainPendingCall(call, now)) continue;
                pendingIter.remove();
            }
            return pollTimeout;
        }

        private boolean maybeDrainPendingCall(Call call, long now) {
            try {
                Node node = call.nodeProvider.provide();
                if (node != null) {
                    KafkaAdminClient.this.log.trace("Assigned {} to node {}", (Object)call, (Object)node);
                    call.curNode = node;
                    KafkaAdminClient.getOrCreateListValue(this.callsToSend, node).add(call);
                    return true;
                }
                KafkaAdminClient.this.log.trace("Unable to assign {} to a node.", (Object)call);
                return false;
            }
            catch (Throwable t) {
                KafkaAdminClient.this.log.debug("Unable to choose node for {}", (Object)call, (Object)t);
                call.fail(now, t);
                return true;
            }
        }

        private long sendEligibleCalls(long now) {
            long pollTimeout = Long.MAX_VALUE;
            Iterator<Map.Entry<Node, List<Call>>> iter = this.callsToSend.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<Node, List<Call>> entry = iter.next();
                List<Call> calls = entry.getValue();
                if (calls.isEmpty()) {
                    iter.remove();
                    continue;
                }
                Node node = entry.getKey();
                if (!KafkaAdminClient.this.client.ready(node, now)) {
                    long nodeTimeout = KafkaAdminClient.this.client.pollDelayMs(node, now);
                    pollTimeout = Math.min(pollTimeout, nodeTimeout);
                    KafkaAdminClient.this.log.trace("Client is not ready to send to {}. Must delay {} ms", (Object)node, (Object)nodeTimeout);
                    continue;
                }
                Call call = calls.remove(0);
                int timeoutMs = KafkaAdminClient.calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
                AbstractRequest.Builder requestBuilder = null;
                try {
                    requestBuilder = call.createRequest(timeoutMs);
                }
                catch (Throwable throwable) {
                    call.fail(now, new KafkaException(String.format("Internal error sending %s to %s.", call.callName, node)));
                    continue;
                }
                ClientRequest clientRequest = KafkaAdminClient.this.client.newClientRequest(node.idString(), requestBuilder, now, true);
                KafkaAdminClient.this.log.trace("Sending {} to {}. correlationId={}", new Object[]{requestBuilder, node, clientRequest.correlationId()});
                KafkaAdminClient.this.client.send(clientRequest, now);
                KafkaAdminClient.getOrCreateListValue(this.callsInFlight, node.idString()).add(call);
                this.correlationIdToCalls.put(clientRequest.correlationId(), call);
            }
            return pollTimeout;
        }

        private void timeoutCallsInFlight(TimeoutProcessor processor) {
            int numTimedOut = 0;
            for (Map.Entry<String, List<Call>> entry : this.callsInFlight.entrySet()) {
                List<Call> contexts = entry.getValue();
                if (contexts.isEmpty()) continue;
                String nodeId = entry.getKey();
                Call call = contexts.get(0);
                if (!processor.callHasExpired(call)) continue;
                if (call.aborted) {
                    KafkaAdminClient.this.log.warn("Aborted call {} is still in callsInFlight.", (Object)call);
                    continue;
                }
                KafkaAdminClient.this.log.debug("Closing connection to {} to time out {}", (Object)nodeId, (Object)call);
                call.aborted = true;
                KafkaAdminClient.this.client.disconnect(nodeId);
                ++numTimedOut;
            }
            if (numTimedOut > 0) {
                KafkaAdminClient.this.log.debug("Timed out {} call(s) in flight.", (Object)numTimedOut);
            }
        }

        private void handleResponses(long now, List<ClientResponse> responses) {
            for (ClientResponse response : responses) {
                int correlationId = response.requestHeader().correlationId();
                Call call = this.correlationIdToCalls.get(correlationId);
                if (call == null) {
                    KafkaAdminClient.this.log.error("Internal server error on {}: server returned information about unknown correlation ID {}, requestHeader = {}", new Object[]{response.destination(), correlationId, response.requestHeader()});
                    KafkaAdminClient.this.client.disconnect(response.destination());
                    continue;
                }
                this.correlationIdToCalls.remove(correlationId);
                List<Call> calls = this.callsInFlight.get(response.destination());
                if (calls == null || !calls.remove(call)) {
                    KafkaAdminClient.this.log.error("Internal server error on {}: ignoring call {} in correlationIdToCall that did not exist in callsInFlight", (Object)response.destination(), (Object)call);
                    continue;
                }
                if (response.versionMismatch() != null) {
                    call.fail(now, response.versionMismatch());
                    continue;
                }
                if (response.wasDisconnected()) {
                    AuthenticationException authException = KafkaAdminClient.this.client.authenticationException(call.curNode());
                    if (authException != null) {
                        call.fail(now, authException);
                        continue;
                    }
                    call.fail(now, new DisconnectException(String.format("Cancelled %s request with correlation id %s due to node %s being disconnected", call.callName, correlationId, response.destination())));
                    continue;
                }
                try {
                    call.handleResponse(response.responseBody());
                    if (!KafkaAdminClient.this.log.isTraceEnabled()) continue;
                    KafkaAdminClient.this.log.trace("{} got response {}", (Object)call, (Object)response.responseBody().toString(response.requestHeader().apiVersion()));
                }
                catch (Throwable t) {
                    if (KafkaAdminClient.this.log.isTraceEnabled()) {
                        KafkaAdminClient.this.log.trace("{} handleResponse failed with {}", (Object)call, (Object)KafkaAdminClient.prettyPrintException(t));
                    }
                    call.fail(now, t);
                }
            }
        }

        private void unassignUnsentCalls(Predicate<Node> shouldUnassign) {
            Iterator<Map.Entry<Node, List<Call>>> iter = this.callsToSend.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<Node, List<Call>> entry = iter.next();
                Node node = entry.getKey();
                List<Call> awaitingCalls = entry.getValue();
                if (awaitingCalls.isEmpty()) {
                    iter.remove();
                    continue;
                }
                if (!shouldUnassign.test(node)) continue;
                this.pendingCalls.addAll(awaitingCalls);
                iter.remove();
            }
        }

        private boolean hasActiveExternalCalls(Collection<Call> calls) {
            for (Call call : calls) {
                if (call.isInternal()) continue;
                return true;
            }
            return false;
        }

        private boolean hasActiveExternalCalls() {
            if (this.hasActiveExternalCalls(this.pendingCalls)) {
                return true;
            }
            for (List<Call> callList : this.callsToSend.values()) {
                if (!this.hasActiveExternalCalls(callList)) continue;
                return true;
            }
            return this.hasActiveExternalCalls(this.correlationIdToCalls.values());
        }

        private boolean threadShouldExit(long now, long curHardShutdownTimeMs) {
            if (!this.hasActiveExternalCalls()) {
                KafkaAdminClient.this.log.trace("All work has been completed, and the I/O thread is now exiting.");
                return true;
            }
            if (now >= curHardShutdownTimeMs) {
                KafkaAdminClient.this.log.info("Forcing a hard I/O thread shutdown. Requests in progress will be aborted.");
                return true;
            }
            KafkaAdminClient.this.log.debug("Hard shutdown in {} ms.", (Object)(curHardShutdownTimeMs - now));
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long now = KafkaAdminClient.this.time.milliseconds();
            KafkaAdminClient.this.log.trace("Thread starting");
            while (true) {
                this.drainNewCalls();
                long curHardShutdownTimeMs = KafkaAdminClient.this.hardShutdownTimeMs.get();
                if (curHardShutdownTimeMs != -1L && this.threadShouldExit(now, curHardShutdownTimeMs)) break;
                TimeoutProcessor timeoutProcessor = KafkaAdminClient.this.timeoutProcessorFactory.create(now);
                this.timeoutPendingCalls(timeoutProcessor);
                this.timeoutCallsToSend(timeoutProcessor);
                this.timeoutCallsInFlight(timeoutProcessor);
                long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
                if (curHardShutdownTimeMs != -1L) {
                    pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
                }
                pollTimeout = Math.min(pollTimeout, this.maybeDrainPendingCalls(now));
                long metadataFetchDelayMs = KafkaAdminClient.this.metadataManager.metadataFetchDelayMs(now);
                if (metadataFetchDelayMs == 0L) {
                    KafkaAdminClient.this.metadataManager.transitionToUpdatePending(now);
                    Call metadataCall = this.makeMetadataCall(now);
                    if (!this.maybeDrainPendingCall(metadataCall, now)) {
                        this.pendingCalls.add(metadataCall);
                    }
                }
                pollTimeout = Math.min(pollTimeout, this.sendEligibleCalls(now));
                if (metadataFetchDelayMs > 0L) {
                    pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                }
                if (!this.pendingCalls.isEmpty()) {
                    pollTimeout = Math.min(pollTimeout, KafkaAdminClient.this.retryBackoffMs);
                }
                KafkaAdminClient.this.log.trace("Entering KafkaClient#poll(timeout={})", (Object)pollTimeout);
                List<ClientResponse> responses = KafkaAdminClient.this.client.poll(pollTimeout, now);
                KafkaAdminClient.this.log.trace("KafkaClient#poll retrieved {} response(s)", (Object)responses.size());
                this.unassignUnsentCalls(KafkaAdminClient.this.client::connectionFailed);
                now = KafkaAdminClient.this.time.milliseconds();
                this.handleResponses(now, responses);
            }
            int numTimedOut = 0;
            TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
            AdminClientRunnable adminClientRunnable = this;
            synchronized (adminClientRunnable) {
                numTimedOut += timeoutProcessor.handleTimeouts(this.newCalls, "The AdminClient thread has exited.");
                this.newCalls = null;
            }
            numTimedOut += timeoutProcessor.handleTimeouts(this.pendingCalls, "The AdminClient thread has exited.");
            numTimedOut += this.timeoutCallsToSend(timeoutProcessor);
            if ((numTimedOut += timeoutProcessor.handleTimeouts(this.correlationIdToCalls.values(), "The AdminClient thread has exited.")) > 0) {
                KafkaAdminClient.this.log.debug("Timed out {} remaining operation(s).", (Object)numTimedOut);
            }
            Utils.closeQuietly(KafkaAdminClient.this.client, "KafkaClient");
            Utils.closeQuietly(KafkaAdminClient.this.metrics, "Metrics");
            KafkaAdminClient.this.log.debug("Exiting AdminClientRunnable thread.");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void enqueue(Call call, long now) {
            if (KafkaAdminClient.this.log.isDebugEnabled()) {
                KafkaAdminClient.this.log.debug("Queueing {} with a timeout {} ms from now.", (Object)call, (Object)(call.deadlineMs - now));
            }
            boolean accepted = false;
            AdminClientRunnable adminClientRunnable = this;
            synchronized (adminClientRunnable) {
                if (this.newCalls != null) {
                    this.newCalls.add(call);
                    accepted = true;
                }
            }
            if (accepted) {
                KafkaAdminClient.this.client.wakeup();
            } else {
                KafkaAdminClient.this.log.debug("The AdminClient thread has exited. Timing out {}.", (Object)call);
                call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited."));
            }
        }

        void call(Call call, long now) {
            if (KafkaAdminClient.this.hardShutdownTimeMs.get() != -1L) {
                KafkaAdminClient.this.log.debug("The AdminClient is not accepting new calls. Timing out {}.", (Object)call);
                call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
            } else {
                this.enqueue(call, now);
            }
        }

        private Call makeMetadataCall(long now) {
            return new Call(true, "fetchMetadata", KafkaAdminClient.this.calcDeadlineMs(now, KafkaAdminClient.this.defaultTimeoutMs), (NodeProvider)new MetadataUpdateNodeIdProvider()){

                @Override
                public AbstractRequest.Builder createRequest(int timeoutMs) {
                    return new MetadataRequest.Builder(Collections.emptyList(), true);
                }

                @Override
                public void handleResponse(AbstractResponse abstractResponse) {
                    MetadataResponse response = (MetadataResponse)abstractResponse;
                    long now = KafkaAdminClient.this.time.milliseconds();
                    KafkaAdminClient.this.metadataManager.update(response.cluster(), now);
                    AdminClientRunnable.this.unassignUnsentCalls(node -> true);
                }

                @Override
                public void handleFailure(Throwable e) {
                    KafkaAdminClient.this.metadataManager.updateFailed(e);
                }
            };
        }
    }

    static class TimeoutProcessor {
        private final long now;
        private int nextTimeoutMs;

        TimeoutProcessor(long now) {
            this.now = now;
            this.nextTimeoutMs = Integer.MAX_VALUE;
        }

        int handleTimeouts(Collection<Call> calls, String msg) {
            int numTimedOut = 0;
            Iterator<Call> iter = calls.iterator();
            while (iter.hasNext()) {
                Call call = iter.next();
                int remainingMs = KafkaAdminClient.calcTimeoutMsRemainingAsInt(this.now, call.deadlineMs);
                if (remainingMs < 0) {
                    call.fail(this.now, new TimeoutException(msg));
                    iter.remove();
                    ++numTimedOut;
                    continue;
                }
                this.nextTimeoutMs = Math.min(this.nextTimeoutMs, remainingMs);
            }
            return numTimedOut;
        }

        boolean callHasExpired(Call call) {
            int remainingMs = KafkaAdminClient.calcTimeoutMsRemainingAsInt(this.now, call.deadlineMs);
            if (remainingMs < 0) {
                return true;
            }
            this.nextTimeoutMs = Math.min(this.nextTimeoutMs, remainingMs);
            return false;
        }

        int nextTimeoutMs() {
            return this.nextTimeoutMs;
        }
    }

    static class TimeoutProcessorFactory {
        TimeoutProcessorFactory() {
        }

        TimeoutProcessor create(long now) {
            return new TimeoutProcessor(now);
        }
    }

    abstract class Call {
        private final boolean internal;
        private final String callName;
        private final long deadlineMs;
        private final NodeProvider nodeProvider;
        private int tries = 0;
        private boolean aborted = false;
        private Node curNode = null;
        private long nextAllowedTryMs = 0L;

        Call(boolean internal, String callName, long deadlineMs, NodeProvider nodeProvider) {
            this.internal = internal;
            this.callName = callName;
            this.deadlineMs = deadlineMs;
            this.nodeProvider = nodeProvider;
        }

        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
            this(false, callName, deadlineMs, nodeProvider);
        }

        protected Node curNode() {
            return this.curNode;
        }

        final void fail(long now, Throwable throwable) {
            if (this.aborted) {
                ++this.tries;
                if (KafkaAdminClient.this.log.isDebugEnabled()) {
                    KafkaAdminClient.this.log.debug("{} aborted at {} after {} attempt(s)", new Object[]{this, now, this.tries, new Exception(KafkaAdminClient.prettyPrintException(throwable))});
                }
                this.handleFailure(new TimeoutException("Aborted due to timeout."));
                return;
            }
            if (throwable instanceof UnsupportedVersionException && this.handleUnsupportedVersionException((UnsupportedVersionException)throwable)) {
                KafkaAdminClient.this.log.debug("{} attempting protocol downgrade and then retry.", (Object)this);
                KafkaAdminClient.this.runnable.enqueue(this, now);
                return;
            }
            ++this.tries;
            this.nextAllowedTryMs = now + KafkaAdminClient.this.retryBackoffMs;
            if (KafkaAdminClient.calcTimeoutMsRemainingAsInt(now, this.deadlineMs) < 0) {
                if (KafkaAdminClient.this.log.isDebugEnabled()) {
                    KafkaAdminClient.this.log.debug("{} timed out at {} after {} attempt(s)", new Object[]{this, now, this.tries, new Exception(KafkaAdminClient.prettyPrintException(throwable))});
                }
                this.handleFailure(throwable);
                return;
            }
            if (!(throwable instanceof RetriableException)) {
                if (KafkaAdminClient.this.log.isDebugEnabled()) {
                    KafkaAdminClient.this.log.debug("{} failed with non-retriable exception after {} attempt(s)", new Object[]{this, this.tries, new Exception(KafkaAdminClient.prettyPrintException(throwable))});
                }
                this.handleFailure(throwable);
                return;
            }
            if (this.tries > KafkaAdminClient.this.maxRetries) {
                if (KafkaAdminClient.this.log.isDebugEnabled()) {
                    KafkaAdminClient.this.log.debug("{} failed after {} attempt(s)", new Object[]{this, this.tries, new Exception(KafkaAdminClient.prettyPrintException(throwable))});
                }
                this.handleFailure(throwable);
                return;
            }
            if (KafkaAdminClient.this.log.isDebugEnabled()) {
                KafkaAdminClient.this.log.debug("{} failed: {}. Beginning retry #{}", new Object[]{this, KafkaAdminClient.prettyPrintException(throwable), this.tries});
            }
            KafkaAdminClient.this.runnable.enqueue(this, now);
        }

        abstract AbstractRequest.Builder createRequest(int var1);

        abstract void handleResponse(AbstractResponse var1);

        abstract void handleFailure(Throwable var1);

        boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
            return false;
        }

        public String toString() {
            return "Call(callName=" + this.callName + ", deadlineMs=" + this.deadlineMs + ")";
        }

        public boolean isInternal() {
            return this.internal;
        }
    }

    private class LeastLoadedNodeProvider
    implements NodeProvider {
        private LeastLoadedNodeProvider() {
        }

        @Override
        public Node provide() {
            if (KafkaAdminClient.this.metadataManager.isReady()) {
                return KafkaAdminClient.this.client.leastLoadedNode(KafkaAdminClient.this.time.milliseconds());
            }
            KafkaAdminClient.this.metadataManager.requestUpdate();
            return null;
        }
    }

    private class ControllerNodeProvider
    implements NodeProvider {
        private ControllerNodeProvider() {
        }

        @Override
        public Node provide() {
            if (KafkaAdminClient.this.metadataManager.isReady() && KafkaAdminClient.this.metadataManager.controller() != null) {
                return KafkaAdminClient.this.metadataManager.controller();
            }
            KafkaAdminClient.this.metadataManager.requestUpdate();
            return null;
        }
    }

    private class ConstantNodeIdProvider
    implements NodeProvider {
        private final int nodeId;

        ConstantNodeIdProvider(int nodeId) {
            this.nodeId = nodeId;
        }

        @Override
        public Node provide() {
            if (KafkaAdminClient.this.metadataManager.isReady() && KafkaAdminClient.this.metadataManager.nodeById(this.nodeId) != null) {
                return KafkaAdminClient.this.metadataManager.nodeById(this.nodeId);
            }
            KafkaAdminClient.this.metadataManager.requestUpdate();
            return null;
        }
    }

    private class MetadataUpdateNodeIdProvider
    implements NodeProvider {
        private MetadataUpdateNodeIdProvider() {
        }

        @Override
        public Node provide() {
            return KafkaAdminClient.this.client.leastLoadedNode(KafkaAdminClient.this.time.milliseconds());
        }
    }

    private static interface NodeProvider {
        public Node provide();
    }
}

