/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.storm;

import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.storm.MockOutputCollector;
import org.apache.pulsar.storm.PulsarBolt;
import org.apache.pulsar.storm.PulsarBoltConfiguration;
import org.apache.pulsar.storm.TestUtil;
import org.apache.pulsar.storm.TupleToMessageMapper;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

public class PulsarBoltTest
extends ProducerConsumerBase {
    private static final int NO_OF_RETRIES = 10;
    public final String serviceUrl;
    public final String topic = "persistent://my-property/use/my-ns/my-topic1";
    public final String subscriptionName = "my-subscriber-name";
    protected PulsarBoltConfiguration pulsarBoltConf;
    protected PulsarBolt bolt;
    protected MockOutputCollector mockCollector;
    protected Consumer consumer;
    static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper(){

        public Message toMessage(Tuple tuple) {
            if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
                return null;
            }
            if ("throw exception".equals(new String(tuple.getBinary(0)))) {
                throw new RuntimeException();
            }
            return MessageBuilder.create().setContent(tuple.getBinary(0)).build();
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    };

    public PulsarBoltTest() {
        this.serviceUrl = "http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT;
        this.topic = "persistent://my-property/use/my-ns/my-topic1";
        this.subscriptionName = "my-subscriber-name";
    }

    @BeforeMethod
    public void beforeMethod(Method m) throws Exception {
        super.beforeMethod(m);
        this.setup();
    }

    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.pulsarBoltConf = new PulsarBoltConfiguration();
        this.pulsarBoltConf.setServiceUrl(this.serviceUrl);
        this.pulsarBoltConf.setTopic("persistent://my-property/use/my-ns/my-topic1");
        this.pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
        this.pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
        this.bolt = new PulsarBolt(this.pulsarBoltConf, new ClientConfiguration());
        this.mockCollector = new MockOutputCollector();
        OutputCollector collector = new OutputCollector((IOutputCollector)this.mockCollector);
        TopologyContext context = (TopologyContext)Mockito.mock(TopologyContext.class);
        Mockito.when((Object)context.getThisComponentId()).thenReturn((Object)("test-bolt-" + this.methodName));
        Mockito.when((Object)context.getThisTaskId()).thenReturn((Object)0);
        this.bolt.prepare(Maps.newHashMap(), context, collector);
        this.consumer = this.pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name");
    }

    @AfterMethod
    public void cleanup() throws Exception {
        this.bolt.close();
        this.consumer.close();
        super.internalCleanup();
    }

    private Tuple getMockTuple(String msgContent) {
        Tuple mockTuple = (Tuple)Mockito.mock(Tuple.class);
        Mockito.when((Object)mockTuple.getBinary(0)).thenReturn((Object)msgContent.getBytes());
        Mockito.when((Object)mockTuple.getSourceComponent()).thenReturn((Object)"");
        Mockito.when((Object)mockTuple.getSourceStreamId()).thenReturn((Object)"");
        return mockTuple;
    }

    @Test
    public void testBasic() throws Exception {
        String msgContent = "hello world";
        Tuple tuple = this.getMockTuple(msgContent);
        this.bolt.execute(tuple);
        for (int i = 0; i < 10; ++i) {
            Thread.sleep(1000L);
            if (this.mockCollector.acked()) break;
        }
        Assert.assertTrue((boolean)this.mockCollector.acked());
        Assert.assertFalse((boolean)this.mockCollector.failed());
        Assert.assertNull((Object)this.mockCollector.getLastError());
        Assert.assertEquals((Object)tuple, (Object)this.mockCollector.getAckedTuple());
        Message msg = this.consumer.receive(5, TimeUnit.SECONDS);
        this.consumer.acknowledge(msg);
        Assert.assertEquals((String)msgContent, (String)new String(msg.getData()));
    }

    @Test
    public void testExecuteFailure() throws Exception {
        String msgContent = "throw exception";
        Tuple tuple = this.getMockTuple(msgContent);
        this.bolt.execute(tuple);
        Assert.assertFalse((boolean)this.mockCollector.acked());
        Assert.assertTrue((boolean)this.mockCollector.failed());
        Assert.assertNotNull((Object)this.mockCollector.getLastError());
    }

    @Test
    public void testNoMessageSend() throws Exception {
        String msgContent = "message to be dropped";
        Tuple tuple = this.getMockTuple(msgContent);
        this.bolt.execute(tuple);
        Assert.assertTrue((boolean)this.mockCollector.acked());
        Message msg = this.consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
    }

    @Test
    public void testMetrics() throws Exception {
        int i;
        this.bolt.resetMetrics();
        String msgContent = "hello world";
        Tuple tuple = this.getMockTuple(msgContent);
        for (i = 0; i < 10; ++i) {
            this.bolt.execute(tuple);
        }
        for (i = 0; i < 10; ++i) {
            Thread.sleep(1000L);
            if (this.mockCollector.getNumTuplesAcked() == 10) break;
        }
        Map metrics = (Map)this.bolt.getValueAndReset();
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesSent")), (long)10L);
        Assert.assertEquals((Object)((Double)metrics.get("producerRate")), (Object)(10.0 / (double)this.pulsarBoltConf.getMetricsTimeIntervalInSecs()));
        Assert.assertEquals((Object)((Double)metrics.get("producerThroughput")), (Object)((double)msgContent.getBytes().length * 10.0 / (double)this.pulsarBoltConf.getMetricsTimeIntervalInSecs()));
        metrics = this.bolt.getMetrics();
        Assert.assertEquals((long)((Long)metrics.get("numberOfMessagesSent")), (long)0L);
        for (int i2 = 0; i2 < 10; ++i2) {
            Message msg = this.consumer.receive(5, TimeUnit.SECONDS);
            this.consumer.acknowledge(msg);
        }
    }

    @Test
    public void testSharedProducer() throws Exception {
        PersistentTopicStats topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)topicStats.publishers.size(), (int)1);
        PulsarBolt otherBolt = new PulsarBolt(this.pulsarBoltConf, new ClientConfiguration());
        MockOutputCollector otherMockCollector = new MockOutputCollector();
        OutputCollector collector = new OutputCollector((IOutputCollector)otherMockCollector);
        TopologyContext context = (TopologyContext)Mockito.mock(TopologyContext.class);
        Mockito.when((Object)context.getThisComponentId()).thenReturn((Object)("test-bolt-" + this.methodName));
        Mockito.when((Object)context.getThisTaskId()).thenReturn((Object)1);
        otherBolt.prepare(Maps.newHashMap(), context, collector);
        topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)topicStats.publishers.size(), (int)1);
        otherBolt.close();
        topicStats = this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1");
        Assert.assertEquals((int)topicStats.publishers.size(), (int)1);
    }

    @Test
    public void testSerializability() throws Exception {
        PulsarBolt boltWithNoAuth = new PulsarBolt(this.pulsarBoltConf, new ClientConfiguration());
        TestUtil.testSerializability(boltWithNoAuth);
    }

    @Test
    public void testTickTuple() throws Exception {
        Tuple mockTuple = (Tuple)Mockito.mock(Tuple.class);
        Mockito.when((Object)mockTuple.getSourceComponent()).thenReturn((Object)"__system");
        Mockito.when((Object)mockTuple.getSourceStreamId()).thenReturn((Object)"__tick");
        this.bolt.execute(mockTuple);
        Assert.assertTrue((boolean)this.mockCollector.acked());
        Message msg = this.consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
    }
}

