/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.messaging;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;

public abstract class MessagingBase
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(MessagingBase.class);
    protected String methodName;

    @BeforeMethod(alwaysRun=true)
    public void beforeMethod(Method m) throws Exception {
        this.methodName = m.getName();
    }

    protected String getNonPartitionedTopic(String topicPrefix, boolean isPersistent) throws Exception {
        String nsName = MessagingBase.generateNamespaceName();
        this.pulsarCluster.createNamespace(nsName);
        return MessagingBase.generateTopicName(nsName, topicPrefix, true);
    }

    protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, int partitions) throws Exception {
        Assert.assertTrue((partitions > 0 ? 1 : 0) != 0, (String)"partitions must greater than 1");
        String nsName = MessagingBase.generateNamespaceName();
        this.pulsarCluster.createNamespace(nsName);
        String topicName = MessagingBase.generateTopicName(nsName, topicPrefix, true);
        this.pulsarCluster.createPartitionedTopic(topicName, partitions);
        return topicName;
    }

    protected <T extends Comparable<T>> void receiveMessagesCheckOrderAndDuplicate(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
        HashSet messagesReceived = Sets.newHashSet();
        block2: for (Consumer<T> consumer : consumerList) {
            HashMap<String, Message> lastReceivedMap = new HashMap<String, Message>();
            while (true) {
                Message currentReceived;
                try {
                    currentReceived = consumer.receive(3, TimeUnit.SECONDS);
                }
                catch (PulsarClientException e) {
                    log.info("no more messages to receive for consumer {}", (Object)consumer.getConsumerName());
                    continue block2;
                }
                if (currentReceived == null) continue block2;
                consumer.acknowledge(currentReceived);
                if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
                    Assert.assertTrue((currentReceived.getMessageId().compareTo((Object)((Message)lastReceivedMap.get(currentReceived.getTopicName())).getMessageId()) > 0 ? 1 : 0) != 0, (String)"Received messages are not in order.");
                }
                lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
                Assert.assertTrue((boolean)messagesReceived.add((Comparable)currentReceived.getValue()), (String)("Received duplicate message " + currentReceived.getValue()));
            }
        }
        AssertJUnit.assertEquals((int)messagesToReceive, (int)messagesReceived.size());
    }

    protected <T> void receiveMessagesCheckDuplicate(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
        HashSet messagesReceived = Sets.newHashSet();
        block2: for (Consumer<T> consumer : consumerList) {
            Message currentReceived = null;
            while (true) {
                try {
                    currentReceived = consumer.receive(3, TimeUnit.SECONDS);
                }
                catch (PulsarClientException e) {
                    log.info("no more messages to receive for consumer {}", (Object)consumer.getConsumerName());
                    continue block2;
                }
                if (currentReceived == null) continue block2;
                consumer.acknowledge(currentReceived);
                Assert.assertTrue((boolean)messagesReceived.add(currentReceived.getValue()), (String)("Received duplicate message " + currentReceived.getValue()));
            }
        }
        AssertJUnit.assertEquals((int)messagesReceived.size(), (int)messagesToReceive);
    }

    protected <T> void receiveMessagesCheckStickyKeyAndDuplicate(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
        HashMap consumerKeys = Maps.newHashMap();
        HashSet messagesReceived = Sets.newHashSet();
        block2: for (Consumer<T> consumer : consumerList) {
            while (true) {
                Message currentReceived;
                try {
                    currentReceived = consumer.receive(3, TimeUnit.SECONDS);
                }
                catch (PulsarClientException e) {
                    log.info("no more messages to receive for consumer {}", (Object)consumer.getConsumerName());
                    continue block2;
                }
                if (currentReceived == null) continue block2;
                consumer.acknowledge(currentReceived);
                Assert.assertNotNull((Object)currentReceived.getKey());
                consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
                ((Set)consumerKeys.get(consumer.getConsumerName())).add(currentReceived.getKey());
                Assert.assertTrue((boolean)messagesReceived.add(currentReceived.getValue()), (String)("Received duplicate message " + currentReceived.getValue()));
            }
        }
        HashSet allKeys = Sets.newHashSet();
        consumerKeys.forEach((k, v) -> v.forEach(key -> Assert.assertTrue((boolean)allKeys.add(key), (String)("Key " + key + "is distributed to multiple consumers"))));
        AssertJUnit.assertEquals((int)messagesReceived.size(), (int)messagesToReceive);
    }

    protected <T> void closeConsumers(List<Consumer<T>> consumerList) throws PulsarClientException {
        Iterator<Consumer<T>> iterator = consumerList.iterator();
        while (iterator.hasNext()) {
            iterator.next().close();
            iterator.remove();
        }
    }
}

