/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class KafkaConsumer<K, V>
implements Consumer<K, V> {
    private static final long NO_CURRENT_THREAD = -1L;
    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.consumer";
    static final long DEFAULT_CLOSE_TIMEOUT_MS = 30000L;
    final Metrics metrics;
    private final Logger log;
    private final String clientId;
    private final ConsumerCoordinator coordinator;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Fetcher<K, V> fetcher;
    private final ConsumerInterceptors<K, V> interceptors;
    private final Time time;
    private final ConsumerNetworkClient client;
    private final SubscriptionState subscriptions;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final long requestTimeoutMs;
    private volatile boolean closed = false;
    private List<PartitionAssignor> assignors;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refcount = new AtomicInteger(0);

    public KafkaConsumer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (Deserializer<K>)null, (Deserializer<V>)null);
    }

    public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        try {
            String clientId = config.getString("client.id");
            if (clientId.isEmpty()) {
                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
            }
            this.clientId = clientId;
            String groupId = config.getString("group.id");
            LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
            this.log = logContext.logger(this.getClass());
            this.log.debug("Initializing the Kafka consumer");
            this.requestTimeoutMs = config.getInt("request.timeout.ms").intValue();
            int sessionTimeOutMs = config.getInt("session.timeout.ms");
            int fetchMaxWaitMs = config.getInt("fetch.max.wait.ms");
            if (this.requestTimeoutMs <= (long)sessionTimeOutMs || this.requestTimeoutMs <= (long)fetchMaxWaitMs) {
                throw new ConfigException("request.timeout.ms should be greater than session.timeout.ms and fetch.max.wait.ms");
            }
            this.time = Time.SYSTEM;
            Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricsTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, this.time);
            this.retryBackoffMs = config.getLong("retry.backoff.ms");
            Map<String, Object> userProvidedConfigs = config.originals();
            userProvidedConfigs.put("client.id", clientId);
            List interceptorList = new ConsumerConfig(userProvidedConfigs, false).getConfiguredInstances("interceptor.classes", ConsumerInterceptor.class);
            this.interceptors = new ConsumerInterceptors(interceptorList);
            if (keyDeserializer == null) {
                this.keyDeserializer = config.getConfiguredInstance("key.deserializer", Deserializer.class);
                this.keyDeserializer.configure(config.originals(), true);
            } else {
                config.ignore("key.deserializer");
                this.keyDeserializer = keyDeserializer;
            }
            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance("value.deserializer", Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                config.ignore("value.deserializer");
                this.valueDeserializer = valueDeserializer;
            }
            ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
            this.metadata = new Metadata(this.retryBackoffMs, config.getLong("metadata.max.age.ms"), true, false, clusterResourceListeners);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"));
            this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0L);
            String metricGrpPrefix = "consumer";
            ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
            IsolationLevel isolationLevel = IsolationLevel.valueOf(config.getString("isolation.level").toUpperCase(Locale.ROOT));
            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, metricsRegistry.fetcherMetrics);
            int heartbeatIntervalMs = config.getInt("heartbeat.interval.ms");
            NetworkClient netClient = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, channelBuilder, logContext), this.metadata, clientId, 100, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), this.time, true, new ApiVersions(), throttleTimeSensor, logContext);
            this.client = new ConsumerNetworkClient(logContext, netClient, this.metadata, this.time, this.retryBackoffMs, config.getInt("request.timeout.ms").intValue(), heartbeatIntervalMs);
            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
            this.subscriptions = new SubscriptionState(offsetResetStrategy);
            this.assignors = config.getConfiguredInstances("partition.assignment.strategy", PartitionAssignor.class);
            this.coordinator = new ConsumerCoordinator(logContext, this.client, groupId, config.getInt("max.poll.interval.ms"), config.getInt("session.timeout.ms"), heartbeatIntervalMs, this.assignors, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, this.time, this.retryBackoffMs, config.getBoolean("enable.auto.commit"), config.getInt("auto.commit.interval.ms"), this.interceptors, config.getBoolean("exclude.internal.topics"), config.getBoolean("internal.leave.group.on.close"));
            this.fetcher = new Fetcher<K, V>(logContext, this.client, config.getInt("fetch.min.bytes"), config.getInt("fetch.max.bytes"), config.getInt("fetch.max.wait.ms"), config.getInt("max.partition.fetch.bytes"), config.getInt("max.poll.records"), config.getBoolean("check.crcs"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricsRegistry.fetcherMetrics, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel);
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, this.metrics);
            this.log.debug("Kafka consumer initialized");
        }
        catch (Throwable t) {
            this.close(0L, true);
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

    KafkaConsumer(LogContext logContext, String clientId, ConsumerCoordinator coordinator, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Fetcher<K, V> fetcher, ConsumerInterceptors<K, V> interceptors, Time time, ConsumerNetworkClient client, Metrics metrics, SubscriptionState subscriptions, Metadata metadata, long retryBackoffMs, long requestTimeoutMs, List<PartitionAssignor> assignors) {
        this.log = logContext.logger(this.getClass());
        this.clientId = clientId;
        this.coordinator = coordinator;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.fetcher = fetcher;
        this.interceptors = Objects.requireNonNull(interceptors);
        this.time = time;
        this.client = client;
        this.metrics = metrics;
        this.subscriptions = subscriptions;
        this.metadata = metadata;
        this.retryBackoffMs = retryBackoffMs;
        this.requestTimeoutMs = requestTimeoutMs;
        this.assignors = assignors;
    }

    @Override
    public Set<TopicPartition> assignment() {
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(new HashSet<TopicPartition>(this.subscriptions.assignedPartitions()));
            return set;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<String> subscription() {
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = Collections.unmodifiableSet(new HashSet<String>(this.subscriptions.subscription()));
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        this.acquireAndEnsureOpen();
        try {
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (topics.isEmpty()) {
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (topic != null && !topic.trim().isEmpty()) continue;
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }
                this.throwIfNoAssignorsConfigured();
                this.log.debug("Subscribed to topic(s): {}", (Object)Utils.join(topics, ", "));
                this.subscriptions.subscribe(new HashSet<String>(topics), listener);
                this.metadata.setTopics(this.subscriptions.groupSubscription());
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void subscribe(Collection<String> topics) {
        this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        this.acquireAndEnsureOpen();
        try {
            if (pattern == null) {
                throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
            }
            this.throwIfNoAssignorsConfigured();
            this.log.debug("Subscribed to pattern: {}", (Object)pattern);
            this.subscriptions.subscribe(pattern, listener);
            this.metadata.needMetadataForAllTopics(true);
            this.coordinator.updatePatternSubscription(this.metadata.fetch());
            this.metadata.requestUpdate();
        }
        finally {
            this.release();
        }
    }

    @Override
    public void subscribe(Pattern pattern) {
        this.subscribe(pattern, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public void unsubscribe() {
        this.acquireAndEnsureOpen();
        try {
            this.log.debug("Unsubscribed all topics or patterns and assigned partitions");
            this.subscriptions.unsubscribe();
            this.coordinator.maybeLeaveGroup();
            this.metadata.needMetadataForAllTopics(false);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void assign(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
            }
            if (partitions.isEmpty()) {
                this.unsubscribe();
            } else {
                HashSet<String> topics = new HashSet<String>();
                for (TopicPartition tp : partitions) {
                    String topic;
                    String string = topic = tp != null ? tp.topic() : null;
                    if (topic == null || topic.trim().isEmpty()) {
                        throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                    }
                    topics.add(topic);
                }
                this.coordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
                this.log.debug("Subscribed to partition(s): {}", (Object)Utils.join(partitions, ", "));
                this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions));
                this.metadata.setTopics(topics);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        this.acquireAndEnsureOpen();
        try {
            long elapsed;
            if (timeout < 0L) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            long start = this.time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
                if ((records = this.pollOnce(remaining)).isEmpty()) continue;
                if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) {
                    this.client.pollNoWakeup();
                }
                ConsumerRecords<K, V> consumerRecords = this.interceptors.onConsume(new ConsumerRecords<K, V>(records));
                return consumerRecords;
            } while ((remaining = timeout - (elapsed = this.time.milliseconds() - start)) > 0L);
            ConsumerRecords consumerRecords = ConsumerRecords.empty();
            return consumerRecords;
        }
        finally {
            this.release();
        }
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        this.client.maybeTriggerWakeup();
        long startMs = this.time.milliseconds();
        this.coordinator.poll(startMs, timeout);
        boolean hasAllFetchPositions = this.updateFetchPositions();
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
        this.fetcher.sendFetches();
        long nowMs = this.time.milliseconds();
        long remainingTimeMs = Math.max(0L, timeout - (nowMs - startMs));
        long pollTimeout = Math.min(this.coordinator.timeToNextPoll(nowMs), remainingTimeMs);
        if (!hasAllFetchPositions && pollTimeout > this.retryBackoffMs) {
            pollTimeout = this.retryBackoffMs;
        }
        this.client.poll(pollTimeout, nowMs, new ConsumerNetworkClient.PollCondition(){

            @Override
            public boolean shouldBlock() {
                return !KafkaConsumer.this.fetcher.hasCompletedFetches();
            }
        });
        if (this.coordinator.needRejoin()) {
            return Collections.emptyMap();
        }
        return this.fetcher.fetchedRecords();
    }

    @Override
    public void commitSync() {
        this.acquireAndEnsureOpen();
        try {
            this.coordinator.commitOffsetsSync(this.subscriptions.allConsumed(), Long.MAX_VALUE);
        }
        finally {
            this.release();
        }
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.acquireAndEnsureOpen();
        try {
            this.coordinator.commitOffsetsSync(new HashMap<TopicPartition, OffsetAndMetadata>(offsets), Long.MAX_VALUE);
        }
        finally {
            this.release();
        }
    }

    @Override
    public void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        this.acquireAndEnsureOpen();
        try {
            this.commitAsync(this.subscriptions.allConsumed(), callback);
        }
        finally {
            this.release();
        }
    }

    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.acquireAndEnsureOpen();
        try {
            this.log.debug("Committing offsets: {}", (Object)offsets);
            this.coordinator.commitOffsetsAsync(new HashMap<TopicPartition, OffsetAndMetadata>(offsets), callback);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, long offset) {
        this.acquireAndEnsureOpen();
        try {
            if (offset < 0L) {
                throw new IllegalArgumentException("seek offset must not be a negative number");
            }
            this.log.debug("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            this.subscriptions.seek(partition, offset);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Partitions collection cannot be null");
            }
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            for (TopicPartition tp : parts) {
                this.log.debug("Seeking to beginning of partition {}", (Object)tp);
                this.subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Partitions collection cannot be null");
            }
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            for (TopicPartition tp : parts) {
                this.log.debug("Seeking to end of partition {}", (Object)tp);
                this.subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long position(TopicPartition partition) {
        this.acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(partition)) {
                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
            }
            Long offset = this.subscriptions.position(partition);
            while (offset == null) {
                this.updateFetchPositions();
                this.client.poll(this.retryBackoffMs);
                offset = this.subscriptions.position(partition);
            }
            long l = offset;
            return l;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public OffsetAndMetadata committed(TopicPartition partition) {
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = this.coordinator.fetchCommittedOffsets(Collections.singleton(partition));
            OffsetAndMetadata offsetAndMetadata = offsets.get(partition);
            return offsetAndMetadata;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        this.acquireAndEnsureOpen();
        try {
            Cluster cluster = this.metadata.fetch();
            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
            if (!parts.isEmpty()) {
                List<PartitionInfo> list = parts;
                return list;
            }
            Map<String, List<PartitionInfo>> topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topic), true), this.requestTimeoutMs);
            List<PartitionInfo> list = topicMetadata.get(topic);
            return list;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        this.acquireAndEnsureOpen();
        try {
            Map<String, List<PartitionInfo>> map = this.fetcher.getAllTopicMetadata(this.requestTimeoutMs);
            return map;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            this.log.debug("Pausing partitions {}", (Object)partitions);
            for (TopicPartition partition : partitions) {
                this.subscriptions.pause(partition);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            this.log.debug("Resuming partitions {}", (Object)partitions);
            for (TopicPartition partition : partitions) {
                this.subscriptions.resume(partition);
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<TopicPartition> paused() {
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.pausedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        this.acquireAndEnsureOpen();
        try {
            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
                if (entry.getValue() >= 0L) continue;
                throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative.");
            }
            Map<TopicPartition, OffsetAndTimestamp> map = this.fetcher.offsetsByTimes(timestampsToSearch, this.requestTimeoutMs);
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.fetcher.beginningOffsets(partitions, this.requestTimeoutMs);
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.fetcher.endOffsets(partitions, this.requestTimeoutMs);
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public void close() {
        this.close(30000L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(long timeout, TimeUnit timeUnit) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        this.acquire();
        try {
            if (!this.closed) {
                this.closed = true;
                this.close(timeUnit.toMillis(timeout), false);
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void wakeup() {
        this.client.wakeup();
    }

    private ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<?> ... candidateLists) {
        ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
        for (List<?> candidateList : candidateLists) {
            clusterResourceListeners.maybeAddAll(candidateList);
        }
        clusterResourceListeners.maybeAdd(keyDeserializer);
        clusterResourceListeners.maybeAdd(valueDeserializer);
        return clusterResourceListeners;
    }

    private void close(long timeoutMs, boolean swallowException) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        try {
            if (this.coordinator != null) {
                this.coordinator.close(Math.min(timeoutMs, this.requestTimeoutMs));
            }
        }
        catch (Throwable t) {
            firstException.compareAndSet(null, t);
            this.log.error("Failed to close coordinator", t);
        }
        ClientUtils.closeQuietly(this.fetcher, "fetcher", firstException);
        ClientUtils.closeQuietly(this.interceptors, "consumer interceptors", firstException);
        ClientUtils.closeQuietly(this.metrics, "consumer metrics", firstException);
        ClientUtils.closeQuietly(this.client, "consumer network client", firstException);
        ClientUtils.closeQuietly(this.keyDeserializer, "consumer key deserializer", firstException);
        ClientUtils.closeQuietly(this.valueDeserializer, "consumer value deserializer", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
        this.log.debug("Kafka consumer has been closed");
        Throwable exception = firstException.get();
        if (exception != null && !swallowException) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close kafka consumer", exception);
        }
    }

    private boolean updateFetchPositions() {
        if (this.subscriptions.hasAllFetchPositions()) {
            return true;
        }
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        this.subscriptions.resetMissingPositions();
        this.fetcher.resetOffsetsIfNeeded();
        return false;
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    private void throwIfNoAssignorsConfigured() {
        if (this.assignors.isEmpty()) {
            throw new IllegalStateException("Must configure at least one partition assigner class name to partition.assignment.strategy configuration property");
        }
    }
}

