package org.apache.pulsar.client.impl;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/PartitionedConsumerImpl.class */
public class PartitionedConsumerImpl extends ConsumerBase {
    private final List<ConsumerImpl> consumers;
    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
    private final int sharedQueueResumeThreshold;
    private final int numPartitions;
    private final ReadWriteLock lock;
    private final ConsumerStats stats;
    private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionedConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, String str2, ConsumerConfiguration consumerConfiguration, int i, ExecutorService executorService, CompletableFuture<Consumer> completableFuture) {
        super(pulsarClientImpl, str, str2, consumerConfiguration, Math.max(i, consumerConfiguration.getReceiverQueueSize()), executorService, completableFuture);
        this.lock = new ReentrantReadWriteLock();
        this.consumers = Lists.newArrayListWithCapacity(i);
        this.pausedConsumers = new ConcurrentLinkedQueue<>();
        this.sharedQueueResumeThreshold = this.maxReceiverQueueSize / 2;
        this.numPartitions = i;
        this.stats = pulsarClientImpl.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
        Preconditions.checkArgument(consumerConfiguration.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Partitioned Topics");
        start();
    }

    private void start() {
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger();
        ConsumerConfiguration internalConsumerConfig = getInternalConsumerConfig();
        for (int i = 0; i < this.numPartitions; i++) {
            ConsumerImpl consumerImpl = new ConsumerImpl(this.client, DestinationName.get(this.topic).getPartition(i).toString(), this.subscription, internalConsumerConfig, this.client.externalExecutorProvider().getExecutor(), i, new CompletableFuture());
            this.consumers.add(consumerImpl);
            consumerImpl.subscribeFuture().handle((consumer, th) -> {
                if (th != null) {
                    setState(HandlerBase.State.Failed);
                    atomicReference.compareAndSet(null, th);
                    this.client.cleanupConsumer(this);
                }
                if (atomicInteger.incrementAndGet() != this.numPartitions) {
                    return null;
                }
                if (atomicReference.get() == null) {
                    try {
                        starReceivingMessages();
                        setState(HandlerBase.State.Ready);
                        subscribeFuture().complete(this);
                        log.info("[{}] [{}] Created partitioned consumer", this.topic, this.subscription);
                        return null;
                    } catch (PulsarClientException e) {
                        atomicReference.set(e);
                    }
                }
                closeAsync().handle((r5, th) -> {
                    subscribeFuture().completeExceptionally((Throwable) atomicReference.get());
                    this.client.cleanupConsumer(this);
                    return null;
                });
                log.error("[{}] [{}] Could not create partitioned consumer.", new Object[]{this.topic, this.subscription, ((Throwable) atomicReference.get()).getCause()});
                return null;
            });
        }
    }

    private void starReceivingMessages() throws PulsarClientException {
        for (ConsumerImpl consumerImpl : this.consumers) {
            consumerImpl.sendFlowPermitsToBroker(consumerImpl.cnx(), this.conf.getReceiverQueueSize());
            receiveMessageFromConsumer(consumerImpl);
        }
    }

    private void receiveMessageFromConsumer(ConsumerImpl consumerImpl) {
        consumerImpl.receiveAsync().thenAccept(message -> {
            messageReceived(message);
            if (this.incomingMessages.size() >= this.maxReceiverQueueSize || (this.incomingMessages.size() > this.sharedQueueResumeThreshold && !this.pausedConsumers.isEmpty())) {
                this.pausedConsumers.add(consumerImpl);
            } else {
                this.client.eventLoopGroup().execute(() -> {
                    receiveMessageFromConsumer(consumerImpl);
                });
            }
        });
    }

    private void resumeReceivingFromPausedConsumersIfNeeded() {
        if (this.incomingMessages.size() > this.sharedQueueResumeThreshold || this.pausedConsumers.isEmpty()) {
            return;
        }
        while (true) {
            ConsumerImpl poll = this.pausedConsumers.poll();
            if (poll == null) {
                return;
            } else {
                receiveMessageFromConsumer(poll);
            }
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message internalReceive() throws PulsarClientException {
        try {
            Message take = this.incomingMessages.take();
            resumeReceivingFromPausedConsumersIfNeeded();
            return take;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message internalReceive(int i, TimeUnit timeUnit) throws PulsarClientException {
        try {
            Message poll = this.incomingMessages.poll(i, timeUnit);
            resumeReceivingFromPausedConsumersIfNeeded();
            return poll;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Message> internalReceiveAsync() {
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        try {
            try {
                this.lock.writeLock().lock();
                Message poll = this.incomingMessages.poll(0L, TimeUnit.SECONDS);
                if (poll == null) {
                    this.pendingReceives.add(completableFuture);
                } else {
                    resumeReceivingFromPausedConsumersIfNeeded();
                    completableFuture.complete(poll);
                }
                this.lock.writeLock().unlock();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                completableFuture.completeExceptionally(new PulsarClientException(e));
                this.lock.writeLock().unlock();
            }
            return completableFuture;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, PulsarApi.CommandAck.AckType ackType) {
        Preconditions.checkArgument(messageId instanceof MessageIdImpl);
        return getState() != HandlerBase.State.Ready ? FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")) : ackType == PulsarApi.CommandAck.AckType.Cumulative ? FutureUtil.failedFuture(new PulsarClientException.NotSupportedException("Cumulative acknowledge not supported for partitioned topics")) : this.consumers.get(((MessageIdImpl) messageId).getPartitionIndex()).doAcknowledge(messageId, ackType);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> unsubscribeAsync() {
        if (getState() == HandlerBase.State.Closing || getState() == HandlerBase.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Partitioned Consumer was already closed"));
        }
        setState(HandlerBase.State.Closing);
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger(this.numPartitions);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        for (ConsumerImpl consumerImpl : this.consumers) {
            if (consumerImpl != null) {
                consumerImpl.unsubscribeAsync().handle((r11, th) -> {
                    if (th != null) {
                        atomicReference.compareAndSet(null, th);
                    }
                    if (atomicInteger.decrementAndGet() != 0) {
                        return null;
                    }
                    if (atomicReference.get() == null) {
                        setState(HandlerBase.State.Closed);
                        completableFuture.complete(null);
                        log.info("[{}] [{}] Unsubscribed Partitioned Consumer", this.topic, this.subscription);
                        return null;
                    }
                    setState(HandlerBase.State.Failed);
                    completableFuture.completeExceptionally((Throwable) atomicReference.get());
                    log.error("[{}] [{}] Could not unsubscribe Partitioned Consumer", new Object[]{this.topic, this.subscription, ((Throwable) atomicReference.get()).getCause()});
                    return null;
                });
            }
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> closeAsync() {
        if (getState() == HandlerBase.State.Closing || getState() == HandlerBase.State.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        setState(HandlerBase.State.Closing);
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger(this.numPartitions);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        for (ConsumerImpl consumerImpl : this.consumers) {
            if (consumerImpl != null) {
                consumerImpl.closeAsync().handle((r11, th) -> {
                    if (th != null) {
                        atomicReference.compareAndSet(null, th);
                    }
                    if (atomicInteger.decrementAndGet() != 0) {
                        return null;
                    }
                    if (atomicReference.get() != null) {
                        setState(HandlerBase.State.Failed);
                        completableFuture.completeExceptionally((Throwable) atomicReference.get());
                        log.error("[{}] [{}] Could not close Partitioned Consumer", new Object[]{this.topic, this.subscription, ((Throwable) atomicReference.get()).getCause()});
                        return null;
                    }
                    setState(HandlerBase.State.Closed);
                    completableFuture.complete(null);
                    log.info("[{}] [{}] Closed Partitioned Consumer", this.topic, this.subscription);
                    this.client.cleanupConsumer(this);
                    failPendingReceive();
                    return null;
                });
            }
        }
        return completableFuture;
    }

    private void failPendingReceive() {
        CompletableFuture<Message> poll;
        this.lock.readLock().lock();
        try {
            if (this.listenerExecutor != null && !this.listenerExecutor.isShutdown()) {
                while (!this.pendingReceives.isEmpty() && (poll = this.pendingReceives.poll()) != null) {
                    poll.completeExceptionally(new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public boolean isConnected() {
        Iterator<ConsumerImpl> it = this.consumers.iterator();
        while (it.hasNext()) {
            if (!it.next().isConnected()) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.pulsar.client.impl.HandlerBase
    void connectionFailed(PulsarClientException pulsarClientException) {
    }

    @Override // org.apache.pulsar.client.impl.HandlerBase
    void connectionOpened(ClientCnx clientCnx) {
    }

    void messageReceived(Message message) {
        this.lock.readLock().lock();
        try {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Received message from partitioned-consumer {}", new Object[]{this.topic, this.subscription, message.getMessageId()});
            }
            if (this.pendingReceives.isEmpty()) {
                this.incomingMessages.put(message);
            } else {
                CompletableFuture<Message> poll = this.pendingReceives.poll();
                this.listenerExecutor.execute(() -> {
                    poll.complete(message);
                });
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.lock.readLock().unlock();
        }
        if (this.listener != null) {
            this.listenerExecutor.execute(() -> {
                try {
                    Message internalReceive = internalReceive();
                    try {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Calling message listener for message {}", new Object[]{this.topic, this.subscription, message.getMessageId()});
                        }
                        this.listener.received(this, internalReceive);
                    } catch (Throwable th) {
                        log.error("[{}][{}] Message listener error in processing message: {}", new Object[]{this.topic, this.subscription, message, th});
                    }
                } catch (PulsarClientException e2) {
                    log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, e2});
                }
            });
        }
    }

    @Override // org.apache.pulsar.client.impl.HandlerBase
    String getHandlerName() {
        return this.subscription;
    }

    private ConsumerConfiguration getInternalConsumerConfig() {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setReceiverQueueSize(this.conf.getReceiverQueueSize());
        consumerConfiguration.setSubscriptionType(this.conf.getSubscriptionType());
        consumerConfiguration.setConsumerName(this.consumerName);
        if (this.conf.getAckTimeoutMillis() != 0) {
            consumerConfiguration.setAckTimeout(this.conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
        }
        return consumerConfiguration;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void redeliverUnacknowledgedMessages() {
        Iterator<ConsumerImpl> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().redeliverUnacknowledgedMessages();
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> set) {
        for (ConsumerImpl consumerImpl : this.consumers) {
            HashSet hashSet = new HashSet();
            set.removeIf(messageIdImpl -> {
                if (messageIdImpl.getPartitionIndex() != consumerImpl.getPartitionIndex()) {
                    return false;
                }
                hashSet.add(messageIdImpl);
                return true;
            });
            consumerImpl.redeliverUnacknowledgedMessages(hashSet);
        }
    }

    public boolean isBatchingAckTrackerEmpty() {
        boolean z = true;
        Iterator<ConsumerImpl> it = this.consumers.iterator();
        while (it.hasNext()) {
            z &= it.next().isBatchingAckTrackerEmpty();
        }
        return z;
    }

    List<ConsumerImpl> getConsumers() {
        return this.consumers;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int getAvailablePermits() {
        return this.consumers.stream().mapToInt((v0) -> {
            return v0.getAvailablePermits();
        }).sum();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public boolean hasReachedEndOfTopic() {
        return this.consumers.stream().allMatch((v0) -> {
            return v0.hasReachedEndOfTopic();
        });
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int numMessagesInQueue() {
        return this.incomingMessages.size() + this.consumers.stream().mapToInt((v0) -> {
            return v0.numMessagesInQueue();
        }).sum();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public synchronized ConsumerStats getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        for (int i = 0; i < this.numPartitions; i++) {
            this.stats.updateCumulativeStats(this.consumers.get(i).getStats());
        }
        return this.stats;
    }
}
