/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.compaction;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

public class TestCompaction
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(TestCompaction.class);

    @Test(dataProvider="ServiceUrls", timeOut=300000L)
    public void testPublishCompactAndConsumeCLI(Supplier<String> serviceUrl) throws Exception {
        String tenant = "compaction-test-cli-" + TestCompaction.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.createTenantName(tenant, this.pulsarCluster.getClusterName(), "admin");
        this.createNamespace(namespace);
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();){
            Message m;
            client.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
            try (Producer producer = client.newProducer(Schema.STRING).topic(topic).create();){
                producer.newMessage().key("key0").value((Object)"content0").send();
                producer.newMessage().key("key0").value((Object)"content1").send();
            }
            try (Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).readCompacted(true).subscriptionName("sub1").subscribe();){
                m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)"key0");
                Assert.assertEquals((String)((String)m.getValue()), (String)"content0");
                m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)"key0");
                Assert.assertEquals((String)((String)m.getValue()), (String)"content1");
            }
            this.pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", "-t", topic);
            consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).readCompacted(true).subscriptionName("sub1").subscribe();
            try {
                m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)"key0");
                Assert.assertEquals((String)((String)m.getValue()), (String)"content1");
            }
            finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
    }

    @Test(dataProvider="ServiceUrls", timeOut=300000L)
    public void testPublishCompactAndConsumeRest(Supplier<String> serviceUrl) throws Exception {
        String tenant = "compaction-test-rest-" + TestCompaction.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.createTenantName(tenant, this.pulsarCluster.getClusterName(), "admin");
        this.createNamespace(namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-clusters", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();){
            Message m;
            client.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
            try (Producer producer = client.newProducer(Schema.STRING).topic(topic).create();){
                producer.newMessage().key("key0").value((Object)"content0").send();
                producer.newMessage().key("key0").value((Object)"content1").send();
            }
            try (Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).readCompacted(true).subscriptionName("sub1").subscribe();){
                m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)"key0");
                Assert.assertEquals((String)((String)m.getValue()), (String)"content0");
                m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)"key0");
                Assert.assertEquals((String)((String)m.getValue()), (String)"content1");
            }
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "compact", topic);
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "compaction-status", "-w", topic);
            consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).readCompacted(true).subscriptionName("sub1").subscribe();
            try {
                m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)"key0");
                Assert.assertEquals((String)((String)m.getValue()), (String)"content1");
            }
            finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
    }

    @Test(dataProvider="ServiceUrls", timeOut=300000L)
    public void testPublishCompactAndConsumePartitionedTopics(Supplier<String> serviceUrl) throws Exception {
        String tenant = "compaction-test-partitioned-topic-" + TestCompaction.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/partitioned-topic";
        int numKeys = 10;
        int numValuesPerKey = 10;
        String subscriptionName = "sub1";
        this.createTenantName(tenant, this.pulsarCluster.getClusterName(), "admin");
        this.createNamespace(namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-clusters", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.createPartitionedTopic(topic, 2);
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();){
            int i2;
            client.newConsumer().topic(new String[]{topic + "-partition-0"}).subscriptionName("sub1").subscribe().close();
            client.newConsumer().topic(new String[]{topic + "-partition-1"}).subscriptionName("sub1").subscribe().close();
            try (Producer producer = client.newProducer().topic(topic).messageRouter(new MessageRouter(){

                public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                    return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
                }
            }).create();){
                for (i2 = 0; i2 < 10; ++i2) {
                    for (int j = 0; j < 10; ++j) {
                        producer.newMessage().key("" + i2).value((Object)("key-" + i2 + "-value-" + j).getBytes(StandardCharsets.UTF_8)).send();
                    }
                    log.info("Successfully write {} values for key {}", (Object)10, (Object)i2);
                }
            }
            TestCompaction.consumePartition(client, topic + "-partition-0", "sub1", IntStream.range(0, 10).filter(i -> i % 2 == 0).boxed().collect(Collectors.toList()), 10, 0);
            TestCompaction.consumePartition(client, topic + "-partition-1", "sub1", IntStream.range(0, 10).filter(i -> i % 2 != 0).boxed().collect(Collectors.toList()), 10, 0);
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "compact", topic + "-partition-0");
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "compact", topic + "-partition-1");
            Thread.sleep(30000L);
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "compaction-status", "-w", topic + "-partition-0");
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "compaction-status", "-w", topic + "-partition-1");
            Map<Integer, String> compactedData = TestCompaction.consumeCompactedTopic(client, topic, "sub1", 10);
            Assert.assertEquals((int)compactedData.size(), (int)10);
            for (i2 = 0; i2 < 10; ++i2) {
                Assert.assertEquals((String)("key-" + i2 + "-value-9"), (String)compactedData.get(i2));
            }
        }
    }

    private static void consumePartition(PulsarClient client, String topic, String subscription, List<Integer> keys, int numValuesPerKey, int startValue) throws PulsarClientException {
        try (Consumer consumer = client.newConsumer().readCompacted(true).topic(new String[]{topic}).subscriptionName(subscription).subscribe();){
            for (Integer key : keys) {
                for (int i = 0; i < numValuesPerKey; ++i) {
                    Message m = consumer.receive();
                    Assert.assertEquals((String)("" + key), (String)m.getKey());
                    Assert.assertEquals((String)("key-" + key + "-value-" + (startValue + i)), (String)new String((byte[])m.getValue(), StandardCharsets.UTF_8));
                }
                log.info("Read {} values from key {}", (Object)numValuesPerKey, (Object)key);
            }
        }
    }

    private static Map<Integer, String> consumeCompactedTopic(PulsarClient client, String topic, String subscription, int numKeys) throws PulsarClientException {
        Map keys = Maps.newHashMap();
        try (Consumer consumer = client.newConsumer().readCompacted(true).topic(new String[]{topic}).subscriptionName(subscription).subscribe();){
            for (int i = 0; i < numKeys; ++i) {
                Message m = consumer.receive();
                keys.put(Integer.parseInt(m.getKey()), new String((byte[])m.getValue(), StandardCharsets.UTF_8));
            }
        }
        return keys;
    }

    private static void waitAndVerifyCompacted(PulsarClient client, String topic, String sub, String expectedKey, String expectedValue) throws Exception {
        for (int i = 0; i < 60; ++i) {
            try (Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).readCompacted(true).subscriptionName(sub).subscribe();){
                Message m = consumer.receive();
                Assert.assertEquals((String)m.getKey(), (String)expectedKey);
                if (((String)m.getValue()).equals(expectedValue)) {
                    break;
                }
            }
            Thread.sleep(1000L);
        }
        try (Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topic}).readCompacted(true).subscriptionName(sub).subscribe();){
            Message m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)expectedKey);
            Assert.assertEquals((String)((String)m.getValue()), (String)expectedValue);
        }
    }

    @Test(dataProvider="ServiceUrls", timeOut=300000L)
    public void testPublishWithAutoCompaction(Supplier<String> serviceUrl) throws Exception {
        String tenant = "compaction-test-auto-" + TestCompaction.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.createTenantName(tenant, this.pulsarCluster.getClusterName(), "admin");
        this.createNamespace(namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-compaction-threshold", "--threshold", "1", namespace);
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();){
            client.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
            try (Producer producer = client.newProducer(Schema.STRING).topic(topic).create();){
                producer.newMessage().key("key0").value((Object)"content0").send();
                producer.newMessage().key("key0").value((Object)"content1").send();
            }
            TestCompaction.waitAndVerifyCompacted(client, topic, "sub1", "key0", "content1");
            producer = client.newProducer(Schema.STRING).topic(topic).create();
            try {
                producer.newMessage().key("key0").value((Object)"content2").send();
            }
            finally {
                if (producer != null) {
                    producer.close();
                }
            }
            TestCompaction.waitAndVerifyCompacted(client, topic, "sub1", "key0", "content2");
        }
    }

    private ContainerExecResult createTenantName(String tenantName, String allowedClusterName, String adminRoleName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", allowedClusterName, "--admin-roles", adminRoleName, tenantName);
        Assert.assertEquals((long)0L, (long)result.getExitCode());
        return result;
    }

    private ContainerExecResult createNamespace(String Ns) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), Ns);
        Assert.assertEquals((long)0L, (long)result.getExitCode());
        return result;
    }

    private ContainerExecResult createPartitionedTopic(String partitionedTopicName, int numPartitions) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "create-partitioned-topic", "--partitions", "" + numPartitions, partitionedTopicName);
        Assert.assertEquals((long)0L, (long)result.getExitCode());
        return result;
    }
}

