package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ZeroQueueSizeTest.class */
public class ZeroQueueSizeTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(ZeroQueueSizeTest.class);
    private final int totalMessages = 10;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test
    public void validQueueSizeConfig() {
        this.pulsarClient.newConsumer().receiverQueueSize(0);
    }

    @Test(expectedExceptions = {IllegalArgumentException.class})
    public void InvalidQueueSizeConfig() {
        this.pulsarClient.newConsumer().receiverQueueSize(-1);
    }

    @Test(expectedExceptions = {PulsarClientException.InvalidConfigurationException.class})
    public void zeroQueueSizeReceiveAsyncInCompatibility() throws PulsarClientException {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/topic-zeroQueueSizeReceiveAsyncInCompatibility"}).subscriptionName("my-ex-subscription-zeroQueueSizeReceiveAsyncInCompatibility").receiverQueueSize(0).subscribe().receive(10, TimeUnit.SECONDS);
    }

    @Test(expectedExceptions = {PulsarClientException.class})
    public void zeroQueueSizePartitionedTopicInCompatibility() throws PulsarClientException, PulsarAdminException {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizePartitionedTopicInCompatibility";
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-ex-subscription-zeroQueueSizePartitionedTopicInCompatibility").receiverQueueSize(0).subscribe();
    }

    @Test
    public void zeroQueueSizeNormalConsumer() throws PulsarClientException {
        String str = "persistent://prop/use/ns-abc/topic-nonZeroQueueSizeNormalConsumer";
        String str2 = "my-message-nonZeroQueueSizeNormalConsumer-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-ex-subscription-nonZeroQueueSizeNormalConsumer").receiverQueueSize(0).subscribe();
        for (int i = 0; i < 10; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            create.send(str3.getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), str2 + i2);
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive.getData()));
        }
    }

    @Test
    public void zeroQueueSizeConsumerListener() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizeConsumerListener";
        String str2 = "my-message-zeroQueueSizeConsumerListener-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ArrayList newArrayList = Lists.newArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-ex-subscription-zeroQueueSizeConsumerListener").receiverQueueSize(0).messageListener((consumer, message) -> {
            Assert.assertEquals(((ConsumerImpl) consumer).numMessagesInQueue(), 0);
            synchronized (newArrayList) {
                newArrayList.add(message);
            }
            log.info("Consumer received: " + new String(message.getData()));
            countDownLatch.countDown();
        }).subscribe();
        for (int i = 0; i < 10; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            create.send(str3.getBytes());
        }
        countDownLatch.await();
        Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
        Assert.assertEquals(newArrayList.size(), 10);
        for (int i2 = 0; i2 < newArrayList.size(); i2++) {
            Assert.assertEquals(new String(((Message) newArrayList.get(i2)).getData()), str2 + i2);
        }
    }

    @Test
    public void zeroQueueSizeSharedSubscription() throws PulsarClientException {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizeSharedSubscription";
        String str2 = "my-ex-subscription-zeroQueueSizeSharedSubscription";
        String str3 = "my-message-zeroQueueSizeSharedSubscription-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ConsumerImpl[] consumerImplArr = new ConsumerImpl[4];
        for (int i = 0; i < 4; i++) {
            consumerImplArr[i] = (ConsumerImpl) this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(0).subscriptionType(SubscriptionType.Shared).subscribe();
        }
        for (int i2 = 0; i2 < 10; i2++) {
            create.send((str3 + i2).getBytes());
        }
        for (int i3 = 0; i3 < 10; i3++) {
            Assert.assertEquals(consumerImplArr[i3 % 4].numMessagesInQueue(), 0);
            Message receive = consumerImplArr[i3 % 4].receive();
            Assert.assertEquals(new String(receive.getData()), str3 + i3);
            Assert.assertEquals(consumerImplArr[i3 % 4].numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive.getData()));
        }
    }

    @Test
    public void zeroQueueSizeFailoverSubscription() throws PulsarClientException {
        String str = "persistent://prop/use/ns-abc/topic-zeroQueueSizeFailoverSubscription";
        String str2 = "my-ex-subscription-zeroQueueSizeFailoverSubscription";
        String str3 = "my-message-zeroQueueSizeFailoverSubscription-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(0).subscriptionType(SubscriptionType.Failover).consumerName("consumer-1").subscribe();
        ConsumerImpl subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(0).subscriptionType(SubscriptionType.Failover).consumerName("consumer-2").subscribe();
        for (int i = 0; i < 10; i++) {
            create.send((str3 + i).getBytes());
        }
        for (int i2 = 0; i2 < 5; i2++) {
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), str3 + i2);
            Assert.assertEquals(subscribe.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive.getData()));
        }
        subscribe.redeliverUnacknowledgedMessages();
        subscribe.close();
        for (int i3 = 0; i3 < 5; i3++) {
            Assert.assertEquals(subscribe2.numMessagesInQueue(), 0);
            Message receive2 = subscribe2.receive();
            Assert.assertEquals(new String(receive2.getData()), str3 + i3);
            Assert.assertEquals(subscribe2.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(receive2.getData()));
        }
    }

    @Test
    public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop-xyz/use/ns-abc/topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0).subscribe();
        ProducerBuilder messageRoutingMode = this.pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns-abc/topic1").messageRoutingMode(MessageRoutingMode.SinglePartition);
        if (100 != 0) {
            messageRoutingMode.enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            messageRoutingMode.enableBatching(false);
        }
        Producer create = messageRoutingMode.create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        try {
            subscribe.receiveAsync().handle((message, th) -> {
                if (th != null) {
                    return null;
                }
                Assert.fail();
                return null;
            });
            subscribe.close();
        } catch (Throwable th2) {
            subscribe.close();
            throw th2;
        }
    }

    @Test
    public void testZeroQueueSizeMessageRedelivery() throws PulsarClientException {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery"}).receiverQueueSize(0).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery").enableBatching(false).create();
        for (int i = 0; i < 10; i++) {
            create.send(Integer.valueOf(i));
        }
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 20; i2++) {
            hashSet.add((Integer) subscribe.receive().getValue());
        }
        Assert.assertEquals(hashSet.size(), 10);
        subscribe.close();
        create.close();
    }

    @Test
    public void testZeroQueueSizeMessageRedeliveryForListener() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(20);
        HashSet hashSet = new HashSet();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener"}).receiverQueueSize(0).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).messageListener((consumer, message) -> {
            try {
                hashSet.add((Integer) message.getValue());
                countDownLatch.countDown();
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        }).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener").enableBatching(false).create();
        for (int i = 0; i < 10; i++) {
            create.send(Integer.valueOf(i));
        }
        countDownLatch.await();
        Assert.assertEquals(hashSet.size(), 10);
        subscribe.close();
        create.close();
    }

    @Test
    public void testZeroQueueSizeMessageRedeliveryForAsyncReceive() throws PulsarClientException, ExecutionException, InterruptedException {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive"}).receiverQueueSize(0).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive").enableBatching(false).create();
        for (int i = 0; i < 10; i++) {
            create.send(Integer.valueOf(i));
        }
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList(20);
        for (int i2 = 0; i2 < 20; i2++) {
            arrayList.add(subscribe.receiveAsync());
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            hashSet.add((Integer) ((Message) ((CompletableFuture) it.next()).get()).getValue());
        }
        Assert.assertEquals(hashSet.size(), 10);
        subscribe.close();
        create.close();
    }

    @Test(timeOut = 30000)
    public void testPauseAndResume() throws Exception {
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(1));
        AtomicInteger atomicInteger = new AtomicInteger();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/zero-queue-pause-and-resume"}).subscriptionName("sub").receiverQueueSize(0).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            consumer.acknowledgeAsync(message);
            atomicInteger.incrementAndGet();
            ((CountDownLatch) atomicReference.get()).countDown();
        }).subscribe();
        subscribe.pause();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/zero-queue-pause-and-resume").enableBatching(false).create();
        for (int i = 0; i < 2; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(2L, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        Thread.sleep(2000L);
        Assert.assertEquals(atomicInteger.intValue(), 1, "Consumer received messages while paused");
        atomicReference.set(new CountDownLatch(1));
        subscribe.resume();
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(2L, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        subscribe.unsubscribe();
        create.close();
    }

    @Test(timeOut = 30000)
    public void testPauseAndResumeWithUnloading() throws Exception {
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(1));
        AtomicInteger atomicInteger = new AtomicInteger();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/zero-queue-pause-and-resume-with-unloading"}).subscriptionName("sub").receiverQueueSize(0).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            consumer.acknowledgeAsync(message);
            atomicInteger.incrementAndGet();
            ((CountDownLatch) atomicReference.get()).countDown();
        }).subscribe();
        subscribe.pause();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/zero-queue-pause-and-resume-with-unloading").enableBatching(false).create();
        for (int i = 0; i < 2; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(2L, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        this.admin.topics().unload("persistent://prop/ns-abc/zero-queue-pause-and-resume-with-unloading");
        Thread.sleep(2000L);
        Assert.assertEquals(atomicInteger.intValue(), 1, "Consumer received messages while paused");
        atomicReference.set(new CountDownLatch(1));
        subscribe.resume();
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(2L, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        subscribe.unsubscribe();
        create.close();
    }

    @Test(timeOut = 30000)
    public void testPauseAndResumeNoReconnection() throws Exception {
        Object obj = new Object();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"persistent://prop/ns-abc/zero-queue-pause-and-resume-no-reconnection"}).subscriptionName("sub").receiverQueueSize(0).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            synchronizedList.add((Integer) message.getValue());
            try {
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
            }
            synchronized (obj) {
                atomicBoolean.set(false);
                obj.notifyAll();
            }
        }).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("persistent://prop/ns-abc/zero-queue-pause-and-resume-no-reconnection").enableBatching(false).create();
        for (int i = 0; i < 100; i++) {
            subscribe.resume();
            create.newMessage().value(Integer.valueOf(i)).sendAsync();
            synchronized (obj) {
                atomicBoolean.set(true);
                while (atomicBoolean.get()) {
                    obj.wait();
                }
            }
            subscribe.pause();
        }
        log.info("Received messages: {}", synchronizedList);
        Assert.assertEquals(synchronizedList.size(), 100);
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertEquals(((Integer) synchronizedList.get(i2)).intValue(), i2);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1253590721:
                if (implMethodName.equals("lambda$testPauseAndResumeNoReconnection$82e6221c$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1235328321:
                if (implMethodName.equals("lambda$testZeroQueueSizeMessageRedeliveryForListener$c06d63aa$1")) {
                    z = false;
                    break;
                }
                break;
            case -213005378:
                if (implMethodName.equals("lambda$testPauseAndResume$26154963$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1212572667:
                if (implMethodName.equals("lambda$testPauseAndResumeWithUnloading$26154963$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1493702762:
                if (implMethodName.equals("lambda$zeroQueueSizeConsumerListener$ad6fae70$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/ZeroQueueSizeTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    Set set = (Set) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        try {
                            set.add((Integer) message.getValue());
                            countDownLatch.countDown();
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/ZeroQueueSizeTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer2, message2) -> {
                        Assert.assertEquals(((ConsumerImpl) consumer2).numMessagesInQueue(), 0);
                        synchronized (list) {
                            list.add(message2);
                        }
                        log.info("Consumer received: " + new String(message2.getData()));
                        countDownLatch2.countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/ZeroQueueSizeTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/atomic/AtomicReference;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(1);
                    return (consumer3, message3) -> {
                        Assert.assertNotNull(message3, "Message cannot be null");
                        consumer3.acknowledgeAsync(message3);
                        atomicInteger.incrementAndGet();
                        ((CountDownLatch) atomicReference.get()).countDown();
                    };
                }
                break;
            case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/ZeroQueueSizeTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Ljava/lang/Object;Ljava/util/concurrent/atomic/AtomicBoolean;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    Object capturedArg = serializedLambda.getCapturedArg(1);
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(2);
                    return (consumer4, message4) -> {
                        Assert.assertNotNull(message4, "Message cannot be null");
                        list2.add((Integer) message4.getValue());
                        try {
                            consumer4.acknowledge(message4);
                        } catch (PulsarClientException e) {
                        }
                        synchronized (capturedArg) {
                            atomicBoolean.set(false);
                            capturedArg.notifyAll();
                        }
                    };
                }
                break;
            case Test.TestMessage.TESTENUM_FIELD_NUMBER /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/ZeroQueueSizeTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/atomic/AtomicReference;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    AtomicReference atomicReference2 = (AtomicReference) serializedLambda.getCapturedArg(1);
                    return (consumer5, message5) -> {
                        Assert.assertNotNull(message5, "Message cannot be null");
                        consumer5.acknowledgeAsync(message5);
                        atomicInteger2.incrementAndGet();
                        ((CountDownLatch) atomicReference2.get()).countDown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
