package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/InterceptorsTest.class */
public class InterceptorsTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) InterceptorsTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testProducerInterceptor() throws PulsarClientException {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").intercept(new ProducerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.1
            @Override // org.apache.pulsar.client.api.ProducerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ProducerInterceptor
            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                InterceptorsTest.log.info("Before send message: {}", new String(messageImpl.getData()));
                List propertiesList = messageImpl.getMessageBuilder().getPropertiesList();
                for (int i = 0; i < propertiesList.size(); i++) {
                    if (TypedMessageBuilder.CONF_KEY.equals(((PulsarApi.KeyValue) propertiesList.get(i)).getKey())) {
                        messageImpl.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey(TypedMessageBuilder.CONF_KEY).setValue("after").build());
                    }
                }
                return messageImpl;
            }

            @Override // org.apache.pulsar.client.api.ProducerInterceptor
            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId messageId, Throwable th) {
                message.getProperties();
                Assert.assertEquals("complete", message.getProperty(TypedMessageBuilder.CONF_KEY));
                InterceptorsTest.log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), messageId, th);
            }
        }, new ProducerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.2
            @Override // org.apache.pulsar.client.api.ProducerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ProducerInterceptor
            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                InterceptorsTest.log.info("Before send message: {}", new String(messageImpl.getData()));
                List propertiesList = messageImpl.getMessageBuilder().getPropertiesList();
                for (int i = 0; i < propertiesList.size(); i++) {
                    if (TypedMessageBuilder.CONF_KEY.equals(((PulsarApi.KeyValue) propertiesList.get(i)).getKey())) {
                        messageImpl.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey(TypedMessageBuilder.CONF_KEY).setValue("complete").build());
                    }
                }
                return messageImpl;
            }

            @Override // org.apache.pulsar.client.api.ProducerInterceptor
            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId messageId, Throwable th) {
                message.getProperties();
                Assert.assertEquals("complete", message.getProperty(TypedMessageBuilder.CONF_KEY));
                InterceptorsTest.log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), messageId, th);
            }
        }).create();
        log.info("Send result messageId: {}", create.newMessage().property(TypedMessageBuilder.CONF_KEY, "before").value("Hello Pulsar!").send());
        create.close();
    }

    @Test
    public void testProducerInterceptorsWithExceptions() throws PulsarClientException {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").intercept(new ProducerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.3
            @Override // org.apache.pulsar.client.api.ProducerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ProducerInterceptor
            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                throw new NullPointerException();
            }

            @Override // org.apache.pulsar.client.api.ProducerInterceptor
            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId messageId, Throwable th) {
                throw new NullPointerException();
            }
        }).create();
        Assert.assertNotNull(create.newMessage().value("Hello Pulsar!").send());
        create.close();
    }

    @Test
    public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.4
            @Override // org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                messageImpl.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
                return messageImpl;
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        }).subscriptionName("my-subscription").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        create.newMessage().value("Hello Pulsar!").send();
        MessageImpl receive = subscribe.receive();
        boolean z = false;
        Iterator it = receive.getMessageBuilder().getPropertiesList().iterator();
        while (it.hasNext()) {
            if ("beforeConsumer".equals(((PulsarApi.KeyValue) it.next()).getKey())) {
                z = true;
            }
        }
        Assert.assertTrue(z);
        subscribe.acknowledge((Message<?>) receive);
        create.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorWithMultiTopicSubscribe() throws PulsarClientException {
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.5
            @Override // org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                messageImpl.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
                return messageImpl;
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1").subscriptionType(SubscriptionType.Shared).intercept(consumerInterceptor).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("Hello Pulsar!").send();
        create2.newMessage().value("Hello Pulsar!").send();
        int i = 0;
        for (int i2 = 0; i2 < 2; i2++) {
            TopicMessageImpl receive = subscribe.receive();
            Iterator it = receive.getMessage().getMessageBuilder().getPropertiesList().iterator();
            while (it.hasNext()) {
                if ("beforeConsumer".equals(((PulsarApi.KeyValue) it.next()).getKey())) {
                    i++;
                }
            }
            subscribe.acknowledge((Message<?>) receive);
        }
        Assert.assertEquals(2, i);
        create.close();
        create2.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.6
            @Override // org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                messageImpl.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
                return messageImpl;
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topicsPattern("persistent://my-property/my-ns/my-.*").subscriptionType(SubscriptionType.Shared).intercept(consumerInterceptor).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("Hello Pulsar!").send();
        create2.newMessage().value("Hello Pulsar!").send();
        int i = 0;
        for (int i2 = 0; i2 < 2; i2++) {
            TopicMessageImpl receive = subscribe.receive();
            Iterator it = receive.getMessage().getMessageBuilder().getPropertiesList().iterator();
            while (it.hasNext()) {
                if ("beforeConsumer".equals(((PulsarApi.KeyValue) it.next()).getKey())) {
                    i++;
                }
            }
            subscribe.acknowledge((Message<?>) receive);
        }
        Assert.assertEquals(2, i);
        create.close();
        create2.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorForAcknowledgeCumulative() throws PulsarClientException {
        final ArrayList arrayList = new ArrayList();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").subscriptionType(SubscriptionType.Failover).intercept(new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.7
            @Override // org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                messageImpl.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
                return messageImpl;
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                Assert.assertEquals(arrayList.stream().filter(messageId2 -> {
                    return messageId2.compareTo(messageId) <= 0;
                }).count(), 100L);
                arrayList.clear();
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        }).subscriptionName("my-subscription").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 100; i++) {
            create.newMessage().value("Hello Pulsar!").send();
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 100; i3++) {
            MessageImpl receive = subscribe.receive();
            Iterator it = receive.getMessageBuilder().getPropertiesList().iterator();
            while (it.hasNext()) {
                if ("beforeConsumer".equals(((PulsarApi.KeyValue) it.next()).getKey())) {
                    i2++;
                }
            }
            arrayList.add(receive.getMessageId());
            if (i3 == 99) {
                subscribe.acknowledgeCumulative((Message<?>) receive);
            }
        }
        Assert.assertEquals(100, i2);
        create.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(50);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").subscriptionType(SubscriptionType.Failover).intercept(new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.8
            @Override // org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                return message;
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
                CountDownLatch countDownLatch2 = countDownLatch;
                set.forEach(messageId -> {
                    countDownLatch2.countDown();
                });
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        }).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscriptionName("my-subscription").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 100; i++) {
            create.send("Mock message");
        }
        for (int i2 = 0; i2 < 100; i2++) {
            Message<?> receive = subscribe.receive();
            if (i2 % 2 == 0) {
                subscribe.negativeAcknowledge(receive);
            } else {
                subscribe.acknowledge(receive);
            }
        }
        countDownLatch.await();
        Assert.assertEquals(countDownLatch.getCount(), 0L);
        create.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(50);
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.9
            @Override // org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                return message;
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            @Override // org.apache.pulsar.client.api.ConsumerInterceptor
            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
                CountDownLatch countDownLatch2 = countDownLatch;
                set.forEach(messageId -> {
                    countDownLatch2.countDown();
                });
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").subscriptionName("foo").intercept(consumerInterceptor).ackTimeout(2L, TimeUnit.SECONDS).subscribe();
        for (int i = 0; i < 100; i++) {
            create.send("Mock message");
        }
        for (int i2 = 0; i2 < 100; i2++) {
            Message<?> receive = subscribe.receive();
            if (i2 % 2 == 0) {
                subscribe.acknowledge(receive);
            }
        }
        countDownLatch.await();
        Assert.assertEquals(countDownLatch.getCount(), 0L);
        create.close();
        subscribe.close();
    }
}
