package org.apache.pulsar.spark;

import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.class */
public class SparkStreamingPulsarReceiverTest extends MockedPulsarServiceBaseTest {
    @BeforeClass
    protected void setup() throws Exception {
        internalSetup();
    }

    @AfterClass
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test
    public void testReceivedMessage() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        String str = "pulsar://127.0.0.1:" + this.BROKER_PORT + "/";
        SparkStreamingPulsarReceiver sparkStreamingPulsarReceiver = (SparkStreamingPulsarReceiver) Mockito.spy(new SparkStreamingPulsarReceiver(clientConfiguration, consumerConfiguration, str, "persistent://p1/c1/ns1/topic1", "sub1"));
        MessageListener messageListener = (MessageListener) Mockito.spy(new MessageListener() { // from class: org.apache.pulsar.spark.SparkStreamingPulsarReceiverTest.1
            public void received(Consumer consumer, Message message) {
            }
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Consumer.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Message.class);
        ((MessageListener) Mockito.doNothing().when(messageListener)).received((Consumer) forClass.capture(), (Message) forClass2.capture());
        consumerConfiguration.setMessageListener(messageListener);
        sparkStreamingPulsarReceiver.onStart();
        waitForTransmission();
        PulsarClient.create(str, clientConfiguration).createProducer("persistent://p1/c1/ns1/topic1", new ProducerConfiguration()).send("pulsar-spark test message".getBytes());
        waitForTransmission();
        sparkStreamingPulsarReceiver.onStop();
        Assert.assertEquals(new String(((Message) forClass2.getValue()).getData()), "pulsar-spark test message");
    }

    private static void waitForTransmission() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
    }
}
