/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.messaging;

import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public class TopicMessagingBase
extends MessagingBase {
    private static final Logger log = LoggerFactory.getLogger(TopicMessagingBase.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void nonPartitionedTopicSendAndReceiveWithExclusive(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = this.getNonPartitionedTopic("test-non-partitioned-consume-exclusive", isPersistent);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Exclusive).subscribe();
            try {
                try {
                    client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Exclusive).subscribe();
                    Assert.fail((String)"should be failed");
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
                int messagesToSend = 10;
                String producerName = "producerForExclusive";
                Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForExclusive").create();
                try {
                    for (int i = 0; i < 10; ++i) {
                        MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                        Assert.assertNotNull((Object)messageId);
                    }
                    log.info("public messages complete.");
                    this.receiveMessagesCheckOrderAndDuplicate(Collections.singletonList(consumer), 10);
                    log.info("-- Exiting {} test --", (Object)this.methodName);
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void partitionedTopicSendAndReceiveWithExclusive(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int partitions = 3;
        String topicName = this.getPartitionedTopic("test-partitioned-consume-exclusive", isPersistent, 3);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(3);
            for (int i = 0; i < 3; ++i) {
                Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName + "-partition-" + i}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Exclusive).subscribe();
                consumerList.add(consumer);
            }
            Assert.assertEquals((int)3, (int)consumerList.size());
            try {
                client.newConsumer(Schema.STRING).topic(new String[]{topicName + "-partition-0"}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Exclusive).subscribe();
                Assert.fail((String)"should be failed");
            }
            catch (PulsarClientException i) {
                // empty catch block
            }
            int messagesToSend = 9;
            String producerName = "producerForExclusive";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForExclusive").messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
            try {
                for (int i = 0; i < 9; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("public messages complete.");
                this.receiveMessagesCheckOrderAndDuplicate(consumerList, 9);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                crashedConsumer.close();
                for (int i = 0; i < 9; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckOrderAndDuplicate(consumerList, 6);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void nonPartitionedTopicSendAndReceiveWithFailover(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = this.getNonPartitionedTopic("test-non-partitioned-consume-failover", isPersistent);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(2);
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Failover).subscribe();
            consumerList.add(consumer);
            Consumer standbyConsumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Failover).subscribe();
            Assert.assertNotNull((Object)standbyConsumer);
            Assert.assertTrue((boolean)standbyConsumer.isConnected());
            consumerList.add(standbyConsumer);
            int messagesToSend = 10;
            String producerName = "producerForFailover";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForFailover").create();
            try {
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("public messages complete.");
                this.receiveMessagesCheckOrderAndDuplicate(consumerList, 10);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                Thread.sleep(3000L);
                crashedConsumer.close();
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckOrderAndDuplicate(consumerList, 10);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void partitionedTopicSendAndReceiveWithFailover(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int partitions = 3;
        String topicName = this.getPartitionedTopic("test-partitioned-consume-failover", isPersistent, 3);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(3);
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Failover).subscribe();
            consumerList.add(consumer);
            Consumer standbyConsumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Failover).subscribe();
            Assert.assertNotNull((Object)standbyConsumer);
            Assert.assertTrue((boolean)standbyConsumer.isConnected());
            consumerList.add(standbyConsumer);
            Assert.assertEquals((int)consumerList.size(), (int)2);
            int messagesToSend = 9;
            String producerName = "producerForFailover";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForFailover").create();
            try {
                for (int i = 0; i < 9; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("public messages complete.");
                this.receiveMessagesCheckOrderAndDuplicate(consumerList, 9);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                Thread.sleep(3000L);
                crashedConsumer.close();
                for (int i = 0; i < 9; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckOrderAndDuplicate(consumerList, 9);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void nonPartitionedTopicSendAndReceiveWithShared(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = this.getNonPartitionedTopic("test-non-partitioned-consume-shared", isPersistent);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(2);
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Shared).subscribe();
            consumerList.add(consumer);
            Consumer moreConsumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Shared).subscribe();
            Assert.assertNotNull((Object)moreConsumer);
            Assert.assertTrue((boolean)moreConsumer.isConnected());
            consumerList.add(moreConsumer);
            int messagesToSend = 10;
            String producerName = "producerForShared";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForShared").create();
            try {
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("public messages complete.");
                this.receiveMessagesCheckDuplicate(consumerList, 10);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                crashedConsumer.close();
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckDuplicate(consumerList, 10);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void partitionedTopicSendAndReceiveWithShared(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int partitions = 3;
        String topicName = this.getPartitionedTopic("test-partitioned-consume-shared", isPersistent, 3);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(3);
            for (int i = 0; i < 3; ++i) {
                Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Shared).subscribe();
                consumerList.add(consumer);
            }
            Assert.assertEquals((int)3, (int)consumerList.size());
            int messagesToSend = 10;
            String producerName = "producerForFailover";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForFailover").create();
            try {
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("public messages complete.");
                this.receiveMessagesCheckDuplicate(consumerList, 10);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                crashedConsumer.close();
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckDuplicate(consumerList, 10);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void nonPartitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = this.getNonPartitionedTopic("test-non-partitioned-consume-key-shared", isPersistent);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(2);
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            Assert.assertTrue((boolean)consumer.isConnected());
            consumerList.add(consumer);
            Consumer moreConsumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            Assert.assertNotNull((Object)moreConsumer);
            Assert.assertTrue((boolean)moreConsumer.isConnected());
            consumerList.add(moreConsumer);
            int messagesToSend = 10;
            String producerName = "producerForKeyShared";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForKeyShared").create();
            try {
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().key(UUID.randomUUID().toString()).value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("publish messages complete.");
                this.receiveMessagesCheckStickyKeyAndDuplicate(consumerList, 10);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                crashedConsumer.close();
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().key(UUID.randomUUID().toString()).value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckStickyKeyAndDuplicate(consumerList, 10);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void partitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, boolean isPersistent) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int partitions = 3;
        String topicName = this.getPartitionedTopic("test-partitioned-consume-key-shared", isPersistent, 3);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            ArrayList consumerList = new ArrayList(2);
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            Assert.assertTrue((boolean)consumer.isConnected());
            consumerList.add(consumer);
            Consumer moreConsumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            Assert.assertNotNull((Object)moreConsumer);
            Assert.assertTrue((boolean)moreConsumer.isConnected());
            consumerList.add(moreConsumer);
            int messagesToSend = 10;
            String producerName = "producerForKeyShared";
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producerForKeyShared").create();
            try {
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().key(UUID.randomUUID().toString()).value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                log.info("publish messages complete.");
                this.receiveMessagesCheckStickyKeyAndDuplicate(consumerList, 10);
                Consumer crashedConsumer = (Consumer)consumerList.remove(0);
                crashedConsumer.close();
                for (int i = 0; i < 10; ++i) {
                    MessageId messageId = producer.newMessage().key(UUID.randomUUID().toString()).value((Object)(producer.getProducerName() + "-" + i)).send();
                    Assert.assertNotNull((Object)messageId);
                }
                this.receiveMessagesCheckStickyKeyAndDuplicate(consumerList, 10);
                this.closeConsumers(consumerList);
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

