package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javassist.compiler.TokenId;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentFailoverE2ETest.class */
public class PersistentFailoverE2ETest extends BrokerTestBase {
    private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;

    /* loaded from: input_file:org/apache/pulsar/broker/service/PersistentFailoverE2ETest$ActiveInactiveListenerEvent.class */
    private static class ActiveInactiveListenerEvent implements ConsumerEventListener {
        private final Set<Integer> activePtns;
        private final Set<Integer> inactivePtns;

        private ActiveInactiveListenerEvent() {
            this.activePtns = Sets.newHashSet();
            this.inactivePtns = Sets.newHashSet();
        }

        @Override // org.apache.pulsar.client.api.ConsumerEventListener
        public synchronized void becameActive(Consumer<?> consumer, int i) {
            this.activePtns.add(Integer.valueOf(i));
            this.inactivePtns.remove(Integer.valueOf(i));
        }

        @Override // org.apache.pulsar.client.api.ConsumerEventListener
        public synchronized void becameInactive(Consumer<?> consumer, int i) {
            this.activePtns.remove(Integer.valueOf(i));
            this.inactivePtns.add(Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/PersistentFailoverE2ETest$TestConsumerStateEventListener.class */
    public static class TestConsumerStateEventListener implements ConsumerEventListener {
        final LinkedBlockingQueue<Integer> activeQueue;
        final LinkedBlockingQueue<Integer> inActiveQueue;

        private TestConsumerStateEventListener() {
            this.activeQueue = new LinkedBlockingQueue<>();
            this.inActiveQueue = new LinkedBlockingQueue<>();
        }

        @Override // org.apache.pulsar.client.api.ConsumerEventListener
        public void becameActive(Consumer<?> consumer, int i) {
            try {
                this.activeQueue.put(Integer.valueOf(i));
            } catch (InterruptedException e) {
            }
        }

        @Override // org.apache.pulsar.client.api.ConsumerEventListener
        public void becameInactive(Consumer<?> consumer, int i) {
            try {
                this.inActiveQueue.put(Integer.valueOf(i));
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListener testConsumerStateEventListener) throws Exception {
        Assert.assertNull(testConsumerStateEventListener.activeQueue.poll());
        Assert.assertNull(testConsumerStateEventListener.inActiveQueue.poll());
    }

    private void verifyConsumerActive(TestConsumerStateEventListener testConsumerStateEventListener, int i) throws Exception {
        Integer take = testConsumerStateEventListener.activeQueue.take();
        Assert.assertNotNull(take);
        Assert.assertEquals(i, take.intValue());
        Assert.assertNull(testConsumerStateEventListener.inActiveQueue.poll());
    }

    private void verifyConsumerInactive(TestConsumerStateEventListener testConsumerStateEventListener, int i) throws Exception {
        Integer take = testConsumerStateEventListener.inActiveQueue.take();
        Assert.assertNotNull(take);
        Assert.assertEquals(i, take.intValue());
        Assert.assertNull(testConsumerStateEventListener.activeQueue.poll());
    }

    @Test
    public void testSimpleConsumerEventsWithoutPartition() throws Exception {
        TestConsumerStateEventListener testConsumerStateEventListener = new TestConsumerStateEventListener();
        TestConsumerStateEventListener testConsumerStateEventListener2 = new TestConsumerStateEventListener();
        ConsumerBuilder<byte[]> subscriptionType = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/failover-topic1").subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);
        ConsumerBuilder<byte[]> consumerEventListener = subscriptionType.clone().consumerName("1").consumerEventListener(testConsumerStateEventListener);
        Consumer<byte[]> subscribe = consumerEventListener.subscribe();
        Consumer<byte[]> subscribe2 = subscriptionType.clone().consumerName("2").consumerEventListener(testConsumerStateEventListener2).subscribe();
        verifyConsumerActive(testConsumerStateEventListener, -1);
        verifyConsumerInactive(testConsumerStateEventListener2, -1);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/failover-topic1").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("sub1");
        Assert.assertNotNull(persistentTopic);
        Assert.assertNotNull(subscription);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Assert.assertEquals(subscription.getDispatcher().getType(), PulsarApi.CommandSubscribe.SubType.Failover);
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(100);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/failover-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 100; i++) {
            newArrayListWithCapacity.add(create.sendAsync(("my-message-" + i).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        newArrayListWithCapacity.clear();
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 100L);
        Thread.sleep(100L);
        Assert.assertNull(subscribe2.receive(100, TimeUnit.MILLISECONDS));
        for (int i2 = 0; i2 < 100; i2++) {
            Message<byte[]> receive = subscribe.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals(new String(receive.getData()), "my-message-" + i2);
            subscribe.acknowledge((Message<?>) receive);
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        for (int i3 = 0; i3 < 100; i3++) {
            newArrayListWithCapacity.add(create.sendAsync(("my-message-" + i3).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        newArrayListWithCapacity.clear();
        for (int i4 = 0; i4 < 5; i4++) {
            Message<byte[]> receive2 = subscribe.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive2);
            Assert.assertEquals(new String(receive2.getData()), "my-message-" + i4);
            subscribe.acknowledge((Message<?>) receive2);
        }
        for (int i5 = 5; i5 < 10; i5++) {
            Message<byte[]> receive3 = subscribe.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive3);
            Assert.assertEquals(new String(receive3.getData()), "my-message-" + i5);
        }
        subscribe.close();
        Thread.sleep(100L);
        verifyConsumerActive(testConsumerStateEventListener2, -1);
        verifyConsumerNotReceiveAnyStateChanges(testConsumerStateEventListener);
        for (int i6 = 5; i6 < 100; i6++) {
            Message<byte[]> receive4 = subscribe2.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive4);
            Assert.assertEquals(new String(receive4.getData()), "my-message-" + i6);
            subscribe2.acknowledge((Message<?>) receive4);
        }
        Assert.assertNull(subscribe2.receive(100, TimeUnit.MILLISECONDS));
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        for (int i7 = 0; i7 < 100; i7++) {
            newArrayListWithCapacity.add(create.sendAsync(("my-message-" + i7).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        newArrayListWithCapacity.clear();
        for (int i8 = 0; i8 < 5; i8++) {
            Message<byte[]> receive5 = subscribe2.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive5);
            Assert.assertEquals(new String(receive5.getData()), "my-message-" + i8);
            subscribe2.acknowledge((Message<?>) receive5);
        }
        Consumer<byte[]> subscribe3 = consumerEventListener.subscribe();
        Thread.sleep(100L);
        for (int i9 = 5; i9 < 100; i9++) {
            Message<byte[]> receive6 = subscribe3.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive6);
            Assert.assertEquals(new String(receive6.getData()), "my-message-" + i9);
            subscribe3.acknowledge((Message<?>) receive6);
        }
        Assert.assertNull(subscribe3.receive(100, TimeUnit.MILLISECONDS));
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        for (int i10 = 0; i10 < 100; i10++) {
            newArrayListWithCapacity.add(create.sendAsync(("my-message-" + i10).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        newArrayListWithCapacity.clear();
        for (int i11 = 0; i11 < 5; i11++) {
            Message<byte[]> receive7 = subscribe3.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive7);
            Assert.assertEquals(new String(receive7.getData()), "my-message-" + i11);
            subscribe3.acknowledge((Message<?>) receive7);
        }
        TestConsumerStateEventListener testConsumerStateEventListener3 = new TestConsumerStateEventListener();
        Consumer<byte[]> subscribe4 = subscriptionType.clone().consumerName("3").consumerEventListener(testConsumerStateEventListener3).subscribe();
        Thread.sleep(100L);
        verifyConsumerInactive(testConsumerStateEventListener3, -1);
        Assert.assertNull(subscribe4.receive(100, TimeUnit.MILLISECONDS));
        for (int i12 = 5; i12 < 100; i12++) {
            Message<byte[]> receive8 = subscribe3.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive8);
            Assert.assertEquals(new String(receive8.getData()), "my-message-" + i12);
            subscribe3.acknowledge((Message<?>) receive8);
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        try {
            subscribe3.unsubscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e) {
        }
        subscribe3.close();
        subscribe2.close();
        Thread.sleep(100L);
        try {
            subscribe4.unsubscribe();
        } catch (PulsarClientException e2) {
            Assert.fail("Should not fail", e2);
        }
        Thread.sleep(100L);
        Assert.assertNull(persistentTopic.getSubscription("sub1"));
        create.close();
        subscribe4.close();
        this.admin.topics().delete("persistent://prop/use/ns-abc/failover-topic1");
    }

    @Test
    public void testSimpleConsumerEventsWithPartition() throws Exception {
        String str = "persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" + System.nanoTime();
        TopicName topicName = TopicName.get(str);
        HashSet hashSet = new HashSet();
        this.admin.topics().createPartitionedTopic(str, 4);
        ConsumerBuilder<byte[]> subscriptionType = this.pulsarClient.newConsumer().topic(str).subscriptionName("sub1").subscriptionType(SubscriptionType.Failover);
        ActiveInactiveListenerEvent activeInactiveListenerEvent = new ActiveInactiveListenerEvent();
        ActiveInactiveListenerEvent activeInactiveListenerEvent2 = new ActiveInactiveListenerEvent();
        Consumer<byte[]> subscribe = subscriptionType.clone().consumerName("1").consumerEventListener(activeInactiveListenerEvent).subscribe();
        Consumer<byte[]> subscribe2 = subscriptionType.clone().consumerName("2").consumerEventListener(activeInactiveListenerEvent2).subscribe();
        PersistentDispatcherSingleActiveConsumer dispatcher = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(topicName.getPartition(0).toString()).get()).getSubscription("sub1").getDispatcher();
        PersistentDispatcherSingleActiveConsumer dispatcher2 = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(topicName.getPartition(1).toString()).get()).getSubscription("sub1").getDispatcher();
        PersistentDispatcherSingleActiveConsumer dispatcher3 = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(topicName.getPartition(2).toString()).get()).getSubscription("sub1").getDispatcher();
        PersistentDispatcherSingleActiveConsumer dispatcher4 = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(topicName.getPartition(3).toString()).get()).getSubscription("sub1").getDispatcher();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        for (int i = 0; i < 100; i++) {
            create.sendAsync(("my-message-" + i).getBytes());
        }
        create.flush();
        int i2 = 0;
        HashSet newHashSet = Sets.newHashSet();
        while (true) {
            TopicMessageImpl receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge((Message<?>) receive);
            newHashSet.add(Integer.valueOf(receive.getInnerMessageId().getPartitionIndex()));
        }
        Assert.assertTrue(Sets.difference(activeInactiveListenerEvent.activePtns, newHashSet).isEmpty());
        Assert.assertTrue(Sets.difference(activeInactiveListenerEvent2.inactivePtns, newHashSet).isEmpty());
        Assert.assertEquals(i2, 50);
        HashSet newHashSet2 = Sets.newHashSet();
        while (true) {
            TopicMessageImpl receive2 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive2 == null) {
                break;
            }
            i2++;
            subscribe2.acknowledge((Message<?>) receive2);
            newHashSet2.add(Integer.valueOf(receive2.getInnerMessageId().getPartitionIndex()));
        }
        Assert.assertTrue(Sets.difference(activeInactiveListenerEvent.inactivePtns, newHashSet2).isEmpty());
        Assert.assertTrue(Sets.difference(activeInactiveListenerEvent2.activePtns, newHashSet2).isEmpty());
        Assert.assertEquals(i2, 100);
        Assert.assertEquals(dispatcher.getActiveConsumer().consumerName(), "1");
        Assert.assertEquals(dispatcher2.getActiveConsumer().consumerName(), "2");
        Assert.assertEquals(dispatcher3.getActiveConsumer().consumerName(), "1");
        Assert.assertEquals(dispatcher4.getActiveConsumer().consumerName(), "2");
        for (int i3 = 0; i3 < 100; i3++) {
            create.sendAsync(("my-message-" + i3).getBytes());
        }
        create.flush();
        for (int i4 = 0; i4 < 20; i4++) {
            Message<byte[]> receive3 = subscribe.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive3);
            hashSet.add(new String(receive3.getData()));
            subscribe.acknowledge((Message<?>) receive3);
        }
        Consumer<byte[]> subscribe3 = subscriptionType.clone().consumerName("3").subscribe();
        Thread.sleep(100L);
        int i5 = 0;
        while (true) {
            Message<byte[]> receive4 = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive4 == null) {
                break;
            }
            i5++;
            hashSet.add(new String(receive4.getData()));
            subscribe.acknowledge((Message<?>) receive4);
        }
        Assert.assertEquals(i5, 55);
        int i6 = 0;
        while (true) {
            Message<byte[]> receive5 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive5 == null) {
                break;
            }
            i6++;
            hashSet.add(new String(receive5.getData()));
            subscribe2.acknowledge((Message<?>) receive5);
        }
        Assert.assertEquals(i6, 50);
        int i7 = 0;
        while (true) {
            Message<byte[]> receive6 = subscribe3.receive(1, TimeUnit.SECONDS);
            if (receive6 == null) {
                break;
            }
            i7++;
            hashSet.add(new String(receive6.getData()));
            subscribe3.acknowledge((Message<?>) receive6);
        }
        Assert.assertEquals(i7, 15.0f, 10.0f);
        Assert.assertEquals(hashSet.size(), 100);
        Assert.assertEquals(dispatcher.getActiveConsumer().consumerName(), "1");
        Assert.assertEquals(dispatcher2.getActiveConsumer().consumerName(), "2");
        Assert.assertEquals(dispatcher3.getActiveConsumer().consumerName(), "3");
        Assert.assertEquals(dispatcher4.getActiveConsumer().consumerName(), "1");
        hashSet.clear();
        for (int i8 = 0; i8 < 100; i8++) {
            create.sendAsync(("my-message-" + i8).getBytes());
        }
        create.flush();
        for (int i9 = 0; i9 < 10; i9++) {
            Message<byte[]> receive7 = subscribe.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull(receive7);
            hashSet.add(new String(receive7.getData()));
            subscribe.acknowledge((Message<?>) receive7);
        }
        subscribe.close();
        Thread.sleep(100L);
        int i10 = 0;
        while (true) {
            Message<byte[]> receive8 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive8 == null) {
                break;
            }
            i10++;
            hashSet.add(new String(receive8.getData()));
            subscribe2.acknowledge((Message<?>) receive8);
        }
        Assert.assertEquals(i10, 70.0f, 5.0f);
        int i11 = 0;
        while (true) {
            Message<byte[]> receive9 = subscribe3.receive(1, TimeUnit.SECONDS);
            if (receive9 == null) {
                Assert.assertEquals(i11, 70.0f, 5.0f);
                Assert.assertEquals(hashSet.size(), 100);
                Assert.assertEquals(dispatcher.getActiveConsumer().consumerName(), "2");
                Assert.assertEquals(dispatcher2.getActiveConsumer().consumerName(), "3");
                Assert.assertEquals(dispatcher3.getActiveConsumer().consumerName(), "2");
                Assert.assertEquals(dispatcher4.getActiveConsumer().consumerName(), "3");
                create.close();
                subscribe2.close();
                subscribe3.unsubscribe();
                this.admin.topics().deletePartitionedTopic(str);
                return;
            }
            i11++;
            hashSet.add(new String(receive9.getData()));
            subscribe3.acknowledge((Message<?>) receive9);
        }
    }

    @Test
    public void testActiveConsumerFailoverWithDelay() throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        ConsumerBuilder<byte[]> messageListener = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/failover-topic3").subscriptionName("sub1").subscriptionType(SubscriptionType.Failover).messageListener((consumer, message) -> {
            try {
                synchronized (newArrayList) {
                    newArrayList.add(message);
                }
                consumer.acknowledge((Message<?>) message);
            } catch (Exception e) {
                Assert.fail("Should not fail");
            }
        });
        ConsumerBuilder<byte[]> consumerName = messageListener.clone().consumerName("1");
        ConsumerBuilder<byte[]> consumerName2 = messageListener.clone().consumerName("2");
        this.conf.setActiveConsumerFailoverDelayTimeMillis(TokenId.BadToken);
        restartBroker();
        consumerName.subscribe().close();
        PersistentSubscription subscription = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/failover-topic3").get()).getSubscription("sub1");
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(100);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/failover-topic3").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 100; i++) {
            newArrayListWithCapacity.add(create.sendAsync(("my-message-" + i).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        newArrayListWithCapacity.clear();
        create.close();
        CompletableFuture<Consumer<byte[]>> subscribeAsync = consumerName2.subscribeAsync();
        CompletableFuture<Consumer<byte[]>> subscribeAsync2 = consumerName.subscribeAsync();
        for (int i2 = 0; i2 < 20 && (newArrayList.size() < 100 || subscription.getNumberOfEntriesInBacklog() != 0); i2++) {
            if (i2 != 20 - 1) {
                Thread.sleep(100L);
            }
        }
        Assert.assertEquals(newArrayList.size(), 100);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        for (int i3 = 0; i3 < newArrayList.size(); i3++) {
            Assert.assertNotNull(newArrayList.get(i3));
            Assert.assertEquals(new String(((Message) newArrayList.get(i3)).getData()), "my-message-" + i3);
        }
        subscribeAsync2.get().close();
        subscribeAsync.get().unsubscribe();
        this.admin.topics().delete("persistent://prop/use/ns-abc/failover-topic3");
        resetConfig();
        restartBroker();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -587220792:
                if (implMethodName.equals("lambda$testActiveConsumerFailoverWithDelay$850cc70c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PersistentFailoverE2ETest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            synchronized (list) {
                                list.add(message);
                            }
                            consumer.acknowledge((Message<?>) message);
                        } catch (Exception e) {
                            Assert.fail("Should not fail");
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
