package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerPool.class */
public class ConsumerPool implements Closeable {
    private final int maxLeases;
    private final Queue<ConsumerLease> consumerLeases;
    private final List<String> topics;
    private final Map<String, Object> kafkaProperties;
    private final ComponentLog logger;
    private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
    private final AtomicLong consumerCreatedCountRef = new AtomicLong();
    private final AtomicLong consumerClosedCountRef = new AtomicLong();
    private final AtomicLong leasesObtainedCountRef = new AtomicLong();
    private final AtomicLong productivePollCountRef = new AtomicLong();
    private final AtomicLong unproductivePollCountRef = new AtomicLong();

    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerPool$PoolStats.class */
    static final class PoolStats {
        final long consumerCreatedCount;
        final long consumerClosedCount;
        final long leasesObtainedCount;
        final long productivePollCount;
        final long unproductivePollCount;

        PoolStats(long j, long j2, long j3, long j4, long j5) {
            this.consumerCreatedCount = j;
            this.consumerClosedCount = j2;
            this.leasesObtainedCount = j3;
            this.productivePollCount = j4;
            this.unproductivePollCount = j5;
        }

        public String toString() {
            return "Created Consumers [" + this.consumerCreatedCount + "]\nClosed Consumers  [" + this.consumerClosedCount + "]\nLeases Obtained   [" + this.leasesObtainedCount + "]\nProductive Polls  [" + this.productivePollCount + "]\nUnproductive Polls  [" + this.unproductivePollCount + "]\n";
        }
    }

    public ConsumerPool(int i, List<String> list, Map<String, String> map, ComponentLog componentLog) {
        this.maxLeases = i;
        if (i <= 0) {
            throw new IllegalArgumentException("Max leases value must be greather than zero.");
        }
        this.logger = componentLog;
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException("Must have a list of one or more topics");
        }
        this.topics = list;
        this.kafkaProperties = new HashMap(map);
        this.consumerLeases = new ArrayDeque();
    }

    public ConsumerLease obtainConsumer() {
        ConsumerLease poll;
        int i;
        synchronized (this) {
            poll = this.consumerLeases.poll();
            i = this.activeLeaseCount.get();
        }
        if (poll != null || i < this.maxLeases) {
            this.leasesObtainedCountRef.incrementAndGet();
            return poll == null ? createConsumer() : poll;
        }
        this.logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
        return null;
    }

    protected Consumer<byte[], byte[]> createKafkaConsumer() {
        return new KafkaConsumer(this.kafkaProperties);
    }

    private ConsumerLease createConsumer() {
        final Consumer<byte[], byte[]> createKafkaConsumer = createKafkaConsumer();
        this.consumerCreatedCountRef.incrementAndGet();
        try {
            createKafkaConsumer.subscribe(this.topics);
            ConsumerLease consumerLease = new ConsumerLease() { // from class: org.apache.nifi.processors.kafka.pubsub.ConsumerPool.1
                private volatile boolean poisoned = false;
                private volatile boolean closed = false;

                @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease
                public ConsumerRecords<byte[], byte[]> poll() {
                    if (this.poisoned) {
                        throw new KafkaException("The consumer is poisoned and should no longer be used");
                    }
                    try {
                        ConsumerRecords<byte[], byte[]> poll = createKafkaConsumer.poll(50L);
                        if (poll.isEmpty()) {
                            ConsumerPool.this.unproductivePollCountRef.incrementAndGet();
                        } else {
                            ConsumerPool.this.productivePollCountRef.incrementAndGet();
                        }
                        return poll;
                    } catch (KafkaException e) {
                        ConsumerPool.this.logger.warn("Unable to poll from Kafka consumer so will poison and close this " + createKafkaConsumer, e);
                        poison();
                        close();
                        throw e;
                    }
                }

                @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease
                public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
                    if (this.poisoned) {
                        throw new KafkaException("The consumer is poisoned and should no longer be used");
                    }
                    try {
                        createKafkaConsumer.commitSync(map);
                    } catch (KafkaException e) {
                        ConsumerPool.this.logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + createKafkaConsumer, e);
                        poison();
                        close();
                        throw e;
                    }
                }

                @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease, java.io.Closeable, java.lang.AutoCloseable
                public void close() {
                    boolean offer;
                    if (this.closed) {
                        return;
                    }
                    if (this.poisoned || ConsumerPool.this.activeLeaseCount.get() > ConsumerPool.this.maxLeases) {
                        ConsumerPool.this.closeConsumer(createKafkaConsumer);
                        ConsumerPool.this.activeLeaseCount.decrementAndGet();
                        this.closed = true;
                        return;
                    }
                    synchronized (ConsumerPool.this) {
                        offer = ConsumerPool.this.consumerLeases.offer(this);
                    }
                    if (offer) {
                        return;
                    }
                    ConsumerPool.this.closeConsumer(createKafkaConsumer);
                    ConsumerPool.this.activeLeaseCount.decrementAndGet();
                }

                @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease
                public void poison() {
                    this.poisoned = true;
                }
            };
            this.activeLeaseCount.incrementAndGet();
            return consumerLease;
        } catch (KafkaException e) {
            try {
                createKafkaConsumer.close();
                this.consumerClosedCountRef.incrementAndGet();
            } catch (Exception e2) {
                this.consumerClosedCountRef.incrementAndGet();
            }
            throw e;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        ArrayList<ConsumerLease> arrayList = new ArrayList();
        synchronized (this) {
            while (true) {
                ConsumerLease poll = this.consumerLeases.poll();
                if (poll == null) {
                    break;
                } else {
                    arrayList.add(poll);
                }
            }
        }
        for (ConsumerLease consumerLease : arrayList) {
            consumerLease.poison();
            consumerLease.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeConsumer(Consumer consumer) {
        try {
            consumer.unsubscribe();
        } catch (Exception e) {
            this.logger.warn("Failed while unsubscribing " + consumer, e);
        }
        try {
            consumer.close();
            this.consumerClosedCountRef.incrementAndGet();
        } catch (Exception e2) {
            this.consumerClosedCountRef.incrementAndGet();
            this.logger.warn("Failed while closing " + consumer, e2);
        }
    }

    PoolStats getPoolStats() {
        return new PoolStats(this.consumerCreatedCountRef.get(), this.consumerClosedCountRef.get(), this.leasesObtainedCountRef.get(), this.productivePollCountRef.get(), this.unproductivePollCountRef.get());
    }
}
