/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.storm;

import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.storm.MessageToValuesMapper;
import org.apache.pulsar.storm.MockSpoutOutputCollector;
import org.apache.pulsar.storm.PulsarSpout;
import org.apache.pulsar.storm.PulsarSpoutConfiguration;
import org.apache.pulsar.storm.TestUtil;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

public class PulsarSpoutTest
extends ProducerConsumerBase {
    public final String serviceUrl;
    public final String topic = "persistent://my-property/use/my-ns/my-topic1";
    public final String subscriptionName = "my-subscriber-name";
    protected PulsarSpoutConfiguration pulsarSpoutConf;
    protected ConsumerConfiguration consumerConf;
    protected PulsarSpout spout;
    protected MockSpoutOutputCollector mockCollector;
    protected Producer producer;
    public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper(){

        public Values toValues(Message msg) {
            if ("message to be dropped".equals(new String(msg.getData()))) {
                return null;
            }
            return new Values(new Object[]{new String(msg.getData())});
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    };

    public PulsarSpoutTest() {
        this.serviceUrl = "http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT;
        this.topic = "persistent://my-property/use/my-ns/my-topic1";
        this.subscriptionName = "my-subscriber-name";
    }

    @BeforeMethod
    public void beforeMethod(Method m) throws Exception {
        super.beforeMethod(m);
        this.setup();
    }

    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.pulsarSpoutConf = new PulsarSpoutConfiguration();
        this.pulsarSpoutConf.setServiceUrl(this.serviceUrl);
        this.pulsarSpoutConf.setTopic("persistent://my-property/use/my-ns/my-topic1");
        this.pulsarSpoutConf.setSubscriptionName("my-subscriber-name");
        this.pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
        this.pulsarSpoutConf.setFailedRetriesTimeout(1L, TimeUnit.SECONDS);
        this.pulsarSpoutConf.setMaxFailedRetries(2);
        this.pulsarSpoutConf.setSharedConsumerEnabled(true);
        this.pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
        this.consumerConf = new ConsumerConfiguration();
        this.consumerConf.setSubscriptionType(SubscriptionType.Shared);
        this.spout = new PulsarSpout(this.pulsarSpoutConf, new ClientConfiguration(), this.consumerConf);
        this.mockCollector = new MockSpoutOutputCollector();
        SpoutOutputCollector collector = new SpoutOutputCollector((ISpoutOutputCollector)this.mockCollector);
        TopologyContext context = (TopologyContext)Mockito.mock(TopologyContext.class);
        Mockito.when((Object)context.getThisComponentId()).thenReturn((Object)("test-spout-" + this.methodName));
        Mockito.when((Object)context.getThisTaskId()).thenReturn((Object)0);
        this.spout.open(Maps.newHashMap(), context, collector);
        this.producer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
    }

    @AfterMethod
    public void cleanup() throws Exception {
        this.producer.close();
        this.spout.close();
        super.internalCleanup();
    }

    @Test
    public void testBasic() throws Exception {
        String msgContent = "hello world";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        Assert.assertTrue((boolean)this.mockCollector.emitted());
        Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
        this.spout.ack((Object)this.mockCollector.getLastMessage());
    }

    @Test
    public void testRedeliverOnFail() throws Exception {
        String msgContent = "hello world";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        Thread.sleep(150L);
        this.spout.nextTuple();
        Assert.assertTrue((boolean)this.mockCollector.emitted());
        Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
        this.spout.ack((Object)this.mockCollector.getLastMessage());
    }

    @Test
    public void testNoRedeliverOnAck() throws Exception {
        String msgContent = "hello world";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        this.spout.ack((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        this.spout.nextTuple();
        Assert.assertFalse((boolean)this.mockCollector.emitted());
        Assert.assertNull((Object)this.mockCollector.getTupleData());
    }

    @Test
    public void testLimitedRedeliveriesOnTimeout() throws Exception {
        String msgContent = "chuck norris";
        this.producer.send(msgContent.getBytes());
        long startTime = System.currentTimeMillis();
        while (startTime + this.pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System.currentTimeMillis()) {
            this.mockCollector.reset();
            this.spout.nextTuple();
            Assert.assertTrue((boolean)this.mockCollector.emitted());
            Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
            this.spout.fail((Object)this.mockCollector.getLastMessage());
            Thread.sleep(500L);
        }
        this.spout.nextTuple();
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        Thread.sleep(500L);
        this.spout.nextTuple();
        Assert.assertFalse((boolean)this.mockCollector.emitted());
        Assert.assertNull((Object)this.mockCollector.getTupleData());
    }

    @Test
    public void testLimitedRedeliveriesOnCount() throws Exception {
        String msgContent = "hello world";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        Assert.assertTrue((boolean)this.mockCollector.emitted());
        Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        Thread.sleep(150L);
        this.spout.nextTuple();
        Assert.assertTrue((boolean)this.mockCollector.emitted());
        Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        Thread.sleep(300L);
        this.spout.nextTuple();
        Assert.assertTrue((boolean)this.mockCollector.emitted());
        Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        Thread.sleep(500L);
        this.spout.nextTuple();
        Assert.assertFalse((boolean)this.mockCollector.emitted());
        Assert.assertNull((Object)this.mockCollector.getTupleData());
    }

    @Test
    public void testBackoffOnRetry() throws Exception {
        String msgContent = "chuck norris";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        this.mockCollector.reset();
        this.spout.nextTuple();
        Assert.assertFalse((boolean)this.mockCollector.emitted());
        Assert.assertNull((Object)this.mockCollector.getTupleData());
        Thread.sleep(100L);
        this.spout.nextTuple();
        Assert.assertTrue((boolean)this.mockCollector.emitted());
        Assert.assertTrue((boolean)msgContent.equals(this.mockCollector.getTupleData()));
        this.spout.ack((Object)this.mockCollector.getLastMessage());
    }

    @Test
    public void testMessageDrop() throws Exception {
        String msgContent = "message to be dropped";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        Assert.assertFalse((boolean)this.mockCollector.emitted());
        Assert.assertNull((Object)this.mockCollector.getTupleData());
    }

    @Test
    public void testMetrics() throws Exception {
        this.spout.resetMetrics();
        String msgContent = "hello world";
        this.producer.send(msgContent.getBytes());
        this.spout.nextTuple();
        Map metrics = this.spout.getMetrics();
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesReceived")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesEmitted")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingFailedMessages")), (long)0L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingAcks")), (long)1L);
        Assert.assertEquals((Object)((Double)metrics.get("consumerRate")), (Object)(1.0 / (double)this.pulsarSpoutConf.getMetricsTimeIntervalInSecs()));
        Assert.assertEquals((Object)((Double)metrics.get("consumerThroughput")), (Object)((double)msgContent.getBytes().length / (double)this.pulsarSpoutConf.getMetricsTimeIntervalInSecs()));
        this.spout.fail((Object)this.mockCollector.getLastMessage());
        metrics = this.spout.getMetrics();
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesReceived")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesEmitted")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingFailedMessages")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingAcks")), (long)0L);
        Thread.sleep(150L);
        this.spout.nextTuple();
        metrics = this.spout.getMetrics();
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesReceived")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesEmitted")), (long)2L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingFailedMessages")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingAcks")), (long)1L);
        this.spout.ack((Object)this.mockCollector.getLastMessage());
        metrics = (Map)this.spout.getValueAndReset();
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesReceived")), (long)1L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesEmitted")), (long)2L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingFailedMessages")), (long)0L);
        Assert.assertEquals((long)((Long)metrics.get("numberOfPendingAcks")), (long)0L);
    }

    @Test
    public void testSharedConsumer() throws Exception {
        PersistentTopicStats topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"my-subscriber-name")).consumers.size(), (int)1);
        PulsarSpout otherSpout = new PulsarSpout(this.pulsarSpoutConf, new ClientConfiguration(), this.consumerConf);
        MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
        SpoutOutputCollector collector = new SpoutOutputCollector((ISpoutOutputCollector)otherMockCollector);
        TopologyContext context = (TopologyContext)Mockito.mock(TopologyContext.class);
        Mockito.when((Object)context.getThisComponentId()).thenReturn((Object)("test-spout-" + this.methodName));
        Mockito.when((Object)context.getThisTaskId()).thenReturn((Object)1);
        otherSpout.open(Maps.newHashMap(), context, collector);
        topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"my-subscriber-name")).consumers.size(), (int)1);
        otherSpout.close();
        topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"my-subscriber-name")).consumers.size(), (int)1);
    }

    @Test
    public void testNoSharedConsumer() throws Exception {
        PersistentTopicStats topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"my-subscriber-name")).consumers.size(), (int)1);
        this.pulsarSpoutConf.setSharedConsumerEnabled(false);
        PulsarSpout otherSpout = new PulsarSpout(this.pulsarSpoutConf, new ClientConfiguration(), this.consumerConf);
        MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
        SpoutOutputCollector collector = new SpoutOutputCollector((ISpoutOutputCollector)otherMockCollector);
        TopologyContext context = (TopologyContext)Mockito.mock(TopologyContext.class);
        Mockito.when((Object)context.getThisComponentId()).thenReturn((Object)("test-spout-" + this.methodName));
        Mockito.when((Object)context.getThisTaskId()).thenReturn((Object)1);
        otherSpout.open(Maps.newHashMap(), context, collector);
        topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"my-subscriber-name")).consumers.size(), (int)2);
        otherSpout.close();
        topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"my-subscriber-name")).consumers.size(), (int)1);
    }

    @Test
    public void testSerializability() throws Exception {
        PulsarSpout spoutWithNoAuth = new PulsarSpout(this.pulsarSpoutConf, new ClientConfiguration());
        TestUtil.testSerializability(spoutWithNoAuth);
    }
}

