/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.semantics;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

public class SemanticsTest
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(SemanticsTest.class);

    @Test(dataProvider="ServiceUrlAndTopics")
    public void testPublishAndConsume(Supplier<String> serviceUrl, boolean isPersistent) throws Exception {
        super.testPublishAndConsume(serviceUrl.get(), isPersistent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ServiceUrls")
    public void testEffectivelyOnceDisabled(Supplier<String> serviceUrl) throws Exception {
        String nsName = SemanticsTest.generateNamespaceName();
        this.pulsarCluster.createNamespace(nsName);
        String topicName = SemanticsTest.generateTopicName(nsName, "testeffectivelyonce", true);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").ackTimeout(10L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
            try {
                Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("effectively-once-producer").initialSequenceId(1L).create();
                try {
                    SemanticsTest.sendMessagesIdempotency((Producer<String>)producer);
                    SemanticsTest.checkMessagesIdempotencyDisabled((Consumer<String>)consumer);
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private static void sendMessagesIdempotency(Producer<String> producer) throws Exception {
        producer.newMessage().sequenceId(1L).value((Object)"message-1").send();
        producer.newMessage().sequenceId(1L).value((Object)"duplicated-message-1").send();
        producer.newMessage().sequenceId(2L).value((Object)"message-2").send();
    }

    private static void checkMessagesIdempotencyDisabled(Consumer<String> consumer) throws Exception {
        SemanticsTest.receiveAndAssertMessage(consumer, 1L, "message-1");
        SemanticsTest.receiveAndAssertMessage(consumer, 1L, "duplicated-message-1");
        SemanticsTest.receiveAndAssertMessage(consumer, 2L, "message-2");
    }

    private static void receiveAndAssertMessage(Consumer<String> consumer, long expectedSequenceId, String expectedContent) throws Exception {
        Message msg = consumer.receive();
        log.info("Received message {}", (Object)msg);
        Assert.assertEquals((long)expectedSequenceId, (long)msg.getSequenceId());
        Assert.assertEquals((String)expectedContent, (String)((String)msg.getValue()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ServiceUrls")
    public void testEffectivelyOnceEnabled(Supplier<String> serviceUrl) throws Exception {
        String nsName = SemanticsTest.generateNamespaceName();
        this.pulsarCluster.createNamespace(nsName);
        this.pulsarCluster.enableDeduplication(nsName, true);
        String topicName = SemanticsTest.generateTopicName(nsName, "testeffectivelyonce", true);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("test-sub").ackTimeout(10L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
            try {
                Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("effectively-once-producer").initialSequenceId(1L).create();
                try {
                    SemanticsTest.sendMessagesIdempotency((Producer<String>)producer);
                    SemanticsTest.checkMessagesIdempotencyEnabled((Consumer<String>)consumer);
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private static void checkMessagesIdempotencyEnabled(Consumer<String> consumer) throws Exception {
        SemanticsTest.receiveAndAssertMessage(consumer, 1L, "message-1");
        SemanticsTest.receiveAndAssertMessage(consumer, 2L, "message-2");
    }

    @Test
    public void testSubscriptionInitialPositionOneTopic() throws Exception {
        this.testSubscriptionInitialPosition(1);
    }

    @Test
    public void testSubscriptionInitialPositionTwoTopics() throws Exception {
        this.testSubscriptionInitialPosition(2);
    }

    private void testSubscriptionInitialPosition(int numTopics) throws Exception {
        String topicName = SemanticsTest.generateTopicName("test-subscription-initial-pos", true);
        int numMessages = 10;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            int i;
            for (int t = 0; t < numTopics; ++t) {
                try (Producer producer = client.newProducer(Schema.STRING).topic(topicName + "-" + t).create();){
                    for (i = 0; i < numMessages; ++i) {
                        producer.send((Object)("sip-topic-" + t + "-message-" + i));
                    }
                    continue;
                }
            }
            String[] topics = new String[numTopics];
            HashMap<Integer, AtomicInteger> topicCounters = new HashMap<Integer, AtomicInteger>(numTopics);
            for (i = 0; i < numTopics; ++i) {
                topics[i] = topicName + "-" + i;
                topicCounters.put(i, new AtomicInteger(0));
            }
            try (Consumer consumer = client.newConsumer(Schema.STRING).topic(topics).subscriptionName("my-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
                for (int i2 = 0; i2 < numTopics * numMessages; ++i2) {
                    int topicIdx;
                    Message m = consumer.receive();
                    if (numTopics > 1) {
                        String topic = ((TopicMessageIdImpl)m.getMessageId()).getTopicPartitionName();
                        String[] topicParts = StringUtils.split((String)topic, (char)'-');
                        topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]);
                    } else {
                        topicIdx = 0;
                    }
                    int topicSeq = ((AtomicInteger)topicCounters.get(topicIdx)).getAndIncrement();
                    Assert.assertEquals((String)("sip-topic-" + topicIdx + "-message-" + topicSeq), (String)((String)m.getValue()));
                }
            }
        }
    }

    @Test(dataProvider="ServiceUrls")
    public void testBatchProducing(Supplier<String> serviceUrl) throws Exception {
        BatchMessageIdImpl mid;
        int i;
        List producedMsgIds;
        String topicName = SemanticsTest.generateTopicName("testbatchproducing", true);
        int numMessages = 10;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
             Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();){
            try (Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(true).batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
                List sendFutures = Lists.newArrayList();
                for (int i2 = 0; i2 < numMessages; ++i2) {
                    sendFutures.add(producer.sendAsync((Object)("batch-message-" + i2)));
                }
                CompletableFuture.allOf(sendFutures.toArray(new CompletableFuture[numMessages])).get();
                producedMsgIds = sendFutures.stream().map(future -> (MessageId)future.join()).collect(Collectors.toList());
            }
            for (int i3 = 0; i3 < numMessages; ++i3) {
                Message m = consumer.receive();
                Assert.assertEquals(producedMsgIds.get(i3), (Object)m.getMessageId());
                Assert.assertEquals((String)("batch-message-" + i3), (String)((String)m.getValue()));
            }
        }
        for (i = 0; i < 5; ++i) {
            Assert.assertTrue((boolean)(producedMsgIds.get(i) instanceof BatchMessageIdImpl));
            mid = (BatchMessageIdImpl)producedMsgIds.get(i);
            log.info("Message {} id : {}", (Object)i, (Object)mid);
            Assert.assertEquals((int)i, (int)mid.getBatchIndex());
        }
        for (i = 5; i < 10; ++i) {
            Assert.assertTrue((boolean)(producedMsgIds.get(i) instanceof BatchMessageIdImpl));
            mid = (BatchMessageIdImpl)producedMsgIds.get(i);
            log.info("Message {} id : {}", (Object)i, (Object)mid);
            Assert.assertEquals((int)(i - 5), (int)mid.getBatchIndex());
        }
    }
}

