/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.topologies;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.tests.TestRetrySupport;
import org.testng.Assert;
import org.testng.annotations.DataProvider;

public abstract class PulsarTestBase
extends TestRetrySupport {
    @DataProvider(name="TopicDomain")
    public Object[][] topicDomain() {
        return new Object[][]{{"persistent"}, {"non-persistent"}};
    }

    public static String randomName(int numChars) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < numChars; ++i) {
            sb.append((char)(ThreadLocalRandom.current().nextInt(26) + 97));
        }
        return sb.toString();
    }

    protected static String generateNamespaceName() {
        return "ns-" + PulsarTestBase.randomName(8);
    }

    protected static String generateTopicName(String topicPrefix, boolean isPersistent) {
        return PulsarTestBase.generateTopicName("default", topicPrefix, isPersistent);
    }

    protected static String generateTopicName(String namespace, String topicPrefix, boolean isPersistent) {
        String topicName = topicPrefix + "-" + PulsarTestBase.randomName(8) + "-" + System.currentTimeMillis();
        if (isPersistent) {
            return "persistent://public/" + namespace + "/" + topicName;
        }
        return "non-persistent://public/" + namespace + "/" + topicName;
    }

    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
        String topicName = PulsarTestBase.generateTopicName("testpubconsume", isPersistent);
        int numMessages = 10;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();){
            try (Producer producer = client.newProducer(Schema.STRING).topic(topicName).create();){
                for (int i = 0; i < numMessages; ++i) {
                    producer.send((Object)("smoke-message-" + i));
                }
            }
            for (int i = 0; i < numMessages; ++i) {
                Message m = consumer.receive();
                Assert.assertEquals((String)("smoke-message-" + i), (String)((String)m.getValue()));
            }
        }
    }

    public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
        String topicName = PulsarTestBase.generateTopicName("test-batch-publish-consume", isPersistent);
        int numMessages = 10000;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).receiverQueueSize(10000).subscriptionName("my-sub").subscribe();){
            try (Producer producer = client.newProducer(Schema.STRING).topic(topicName).blockIfQueueFull(true).create();){
                ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
                for (int i = 0; i < 10000; ++i) {
                    futures.add(producer.sendAsync((Object)("smoke-message-" + i)));
                }
                FutureUtil.waitForAll(futures).get();
            }
            for (int i = 0; i < 10000; ++i) {
                Message m = consumer.receive();
                Assert.assertEquals((String)("smoke-message-" + i), (String)((String)m.getValue()));
            }
        }
    }

    public void testBatchIndexAckDisabled(String serviceUrl) throws Exception {
        String topicName = PulsarTestBase.generateTopicName("test-batch-index-ack-disabled", true);
        int numMessages = 100;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Consumer consumer = client.newConsumer(Schema.INT32).topic(new String[]{topicName}).subscriptionName("sub").receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(false).ackTimeout(1L, TimeUnit.SECONDS).subscribe();){
            try (Producer producer = client.newProducer(Schema.INT32).topic(topicName).batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).create();){
                ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
                for (int i = 0; i < 100; ++i) {
                    futures.add(producer.sendAsync((Object)i));
                }
                FutureUtil.waitForAll(futures).get();
            }
            for (int i = 0; i < 100; ++i) {
                Message m = consumer.receive();
                if (i % 2 != 0) continue;
                consumer.acknowledge(m);
            }
            Message redelivery = consumer.receive(3, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)redelivery);
        }
    }
}

