package org.apache.pulsar.client.api;

import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/ConsumerBatchReceiveTest.class */
public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
    private static final Executor EXECUTOR = Executors.newSingleThreadExecutor();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerBatchReceiveTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batchReceivePolicy")
    public Object[][] batchReceivePolicyProvider() {
        return new Object[]{new Object[]{BatchReceivePolicy.DEFAULT_POLICY, true, 1000}, new Object[]{BatchReceivePolicy.builder().timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(10).build(), true, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(13).timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumBytes(64).timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000}, new Object[]{BatchReceivePolicy.DEFAULT_POLICY, false, 1000}, new Object[]{BatchReceivePolicy.builder().timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(10).build(), false, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(13).timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumBytes(64).timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(70).build(), true, 50}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(50).timeout(10, TimeUnit.MILLISECONDS).build(), true, 30}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(-10).timeout(10, TimeUnit.MILLISECONDS).build(), true, 10}, new Object[]{BatchReceivePolicy.builder().maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), true, 30}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(-10).maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), true, 30}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(70).build(), false, 50}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(50).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(-10).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30}, new Object[]{BatchReceivePolicy.builder().maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30}, new Object[]{BatchReceivePolicy.builder().maxNumMessages(-10).maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30}};
    }

    @Test(dataProvider = "batchReceivePolicy")
    public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        testBatchReceive("persistent://my-property/my-ns/batch-receive-non-partition-" + UUID.randomUUID(), batchReceivePolicy, z, i);
    }

    @Test(dataProvider = "batchReceivePolicy")
    public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        String str = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        testBatchReceive(str, batchReceivePolicy, z, i);
    }

    @Test(dataProvider = "batchReceivePolicy")
    public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        testBatchReceiveAsync("persistent://my-property/my-ns/batch-receive-non-partition-async-" + UUID.randomUUID(), batchReceivePolicy, z, i);
    }

    @Test(dataProvider = "batchReceivePolicy")
    public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        String str = "persistent://my-property/my-ns/batch-receive-async-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        testBatchReceiveAsync(str, batchReceivePolicy, z, i);
    }

    @Test(dataProvider = "batchReceivePolicy")
    public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        testBatchReceiveAndRedelivery("persistent://my-property/my-ns/batch-receive-and-redelivery-non-partition-" + UUID.randomUUID(), batchReceivePolicy, z, i);
    }

    @Test(dataProvider = "batchReceivePolicy")
    public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        String str = "persistent://my-property/my-ns/batch-receive-and-redelivery-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        testBatchReceiveAndRedelivery(str, batchReceivePolicy, z, i);
    }

    @Test
    public void verifyBatchSizeIsEqualToPolicyConfiguration() throws Exception {
        String str = "persistent://my-property/my-ns/batch-receive-size" + UUID.randomUUID();
        BatchReceivePolicy build = BatchReceivePolicy.builder().maxNumMessages(100).build();
        Producer<String> create = this.pulsarClient.newProducer(Schema.STRING).topic(str).create();
        try {
            Consumer<String> subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(str).subscriptionName("s2").batchReceivePolicy(build).subscribe();
            try {
                sendMessagesAsyncAndWait(create, 500);
                receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(subscribe, build, 5);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer, BatchReceivePolicy batchReceivePolicy, int i) throws PulsarClientException {
        for (int i2 = 0; i2 < i; i2++) {
            Messages<String> batchReceive = consumer.batchReceive();
            log.info("Received {} messages in a single batch receive verifying batch size.", Integer.valueOf(batchReceive.size()));
            Assert.assertEquals(batchReceive.size(), batchReceivePolicy.getMaxNumMessages());
        }
    }

    private void testBatchReceive(String str, BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(str);
        if (!z) {
            producerBuilder.enableBatching(false);
        }
        Producer<String> create = producerBuilder.create();
        try {
            Consumer<String> subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(str).subscriptionName("s1").receiverQueueSize(i).batchReceivePolicy(batchReceivePolicy).subscribe();
            try {
                sendMessagesAsyncAndWait(create, 100);
                batchReceiveAndCheck(subscribe, 100);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void testBatchReceiveAsync(String str, BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        if (batchReceivePolicy.getTimeoutMs() <= 0) {
            return;
        }
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(str);
        if (!z) {
            producerBuilder.enableBatching(false);
        }
        Producer<String> create = producerBuilder.create();
        try {
            Consumer<String> subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(str).subscriptionName("s1").receiverQueueSize(i).batchReceivePolicy(batchReceivePolicy).subscribe();
            try {
                sendMessagesAsyncAndWait(create, 100);
                CountDownLatch countDownLatch = new CountDownLatch(100);
                receiveAsync(subscribe, 100, countDownLatch);
                countDownLatch.await();
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void testBatchReceiveAndRedelivery(String str, BatchReceivePolicy batchReceivePolicy, boolean z, int i) throws Exception {
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(str);
        if (!z) {
            producerBuilder.enableBatching(false);
        }
        Producer<String> create = producerBuilder.create();
        try {
            Consumer<String> subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(str).subscriptionName("s1").receiverQueueSize(i).batchReceivePolicy(batchReceivePolicy).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
            try {
                sendMessagesAsyncAndWait(create, 100);
                batchReceiveAndRedelivery(subscribe, 100);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void receiveAsync(Consumer<String> consumer, int i, CountDownLatch countDownLatch) {
        consumer.batchReceiveAsync().thenAccept(messages -> {
            if (messages != null) {
                log.info("Received {} messages in a single batch receive.", Integer.valueOf(messages.size()));
                Iterator<Message<T>> it = messages.iterator();
                while (it.hasNext()) {
                    Message message = (Message) it.next();
                    Assert.assertNotNull(message.getValue());
                    log.info("Get message {} from batch", message.getValue());
                    countDownLatch.countDown();
                }
                try {
                    consumer.acknowledge((Messages<?>) messages);
                } catch (PulsarClientException e) {
                    log.error("Ack message error", (Throwable) e);
                }
                if (messages.size() < i) {
                    EXECUTOR.execute(() -> {
                        receiveAsync(consumer, i - messages.size(), countDownLatch);
                    });
                } else {
                    Assert.assertEquals(i, 0);
                }
            }
        });
    }

    private void sendMessagesAsyncAndWait(Producer<String> producer, int i) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i2 = 0; i2 < i; i2++) {
            String str = "my-message-" + i2;
            producer.sendAsync(str).thenAccept(messageId -> {
                log.info("Message {} published {}", str, messageId);
                if (messageId != null) {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
    }

    private void batchReceiveAndCheck(Consumer<String> consumer, int i) throws Exception {
        int i2 = 0;
        do {
            Messages<String> batchReceive = consumer.batchReceive();
            if (batchReceive != null) {
                i2 += batchReceive.size();
                log.info("Received {} messages in a single batch receive.", Integer.valueOf(batchReceive.size()));
                Iterator<String> it = batchReceive.iterator();
                while (it.hasNext()) {
                    Message message = (Message) it.next();
                    Assert.assertNotNull(message.getValue());
                    log.info("Get message {} from batch", message.getValue());
                }
                consumer.acknowledge(batchReceive);
            }
        } while (i2 < i);
        Assert.assertEquals(i, i2);
    }

    private void batchReceiveAndRedelivery(Consumer<String> consumer, int i) throws Exception {
        int i2 = 0;
        do {
            Messages<String> batchReceive = consumer.batchReceive();
            if (batchReceive != null) {
                i2 += batchReceive.size();
                log.info("Received {} messages in a single batch receive.", Integer.valueOf(batchReceive.size()));
                Iterator<String> it = batchReceive.iterator();
                while (it.hasNext()) {
                    Message message = (Message) it.next();
                    Assert.assertNotNull(message.getValue());
                    log.info("Get message {} from batch", message.getValue());
                }
            }
        } while (i2 < i);
        Assert.assertEquals(i, i2);
        do {
            Messages<String> batchReceive2 = consumer.batchReceive();
            if (batchReceive2 != null) {
                i2 += batchReceive2.size();
                log.info("Received {} messages in a single batch receive.", Integer.valueOf(batchReceive2.size()));
                Iterator<String> it2 = batchReceive2.iterator();
                while (it2.hasNext()) {
                    Message message2 = (Message) it2.next();
                    Assert.assertNotNull(message2.getValue());
                    log.info("Get message {} from batch", message2.getValue());
                }
            }
            consumer.acknowledge(batchReceive2);
        } while (i2 < i * 2);
        Assert.assertEquals(i * 2, i2);
    }
}
