package org.apache.beam.sdk.io.pulsar;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.io.pulsar.PulsarIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/pulsar/PulsarIOTest.class */
public class PulsarIOTest {
    private static final String TOPIC = "PULSAR_IO_TEST";
    protected static PulsarContainer pulsarContainer;
    protected static PulsarClient client;
    private static final Logger LOG = LoggerFactory.getLogger(PulsarIOTest.class);
    private long endExpectedTime = 0;
    private long startTime = 0;

    @Rule
    public final transient TestPipeline testPipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/io/pulsar/PulsarIOTest$PulsarRecordsMetric.class */
    public static class PulsarRecordsMetric extends DoFn<PulsarMessage, PulsarMessage> {
        private final Counter counter = Metrics.counter(PulsarIOTest.class.getName(), "PulsarRecordsCounter");

        @DoFn.ProcessElement
        public void processElement(DoFn<PulsarMessage, PulsarMessage>.ProcessContext processContext) {
            this.counter.inc();
            processContext.output((PulsarMessage) processContext.element());
        }
    }

    public List<Message<byte[]>> receiveMessages() throws PulsarClientException {
        if (client == null) {
            initClient();
        }
        ArrayList arrayList = new ArrayList();
        Consumer subscribe = client.newConsumer().topic(new String[]{TOPIC}).subscriptionName("receiveMockMessageFn").subscribe();
        while (subscribe.hasReachedEndOfTopic()) {
            Message receive = subscribe.receive();
            arrayList.add(receive);
            try {
                subscribe.acknowledge(receive);
            } catch (Exception e) {
                subscribe.negativeAcknowledge(receive);
            }
        }
        return arrayList;
    }

    public List<PulsarMessage> produceMessages() throws PulsarClientException {
        client = initClient();
        Producer create = client.newProducer().topic(TOPIC).create();
        Consumer subscribe = client.newConsumer().topic(new String[]{TOPIC}).subscriptionName("produceMockMessageFn").subscribe();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 101; i++) {
            create.send(("PULSAR_TEST_READFROMSIMPLETOPIC_" + i).getBytes(StandardCharsets.UTF_8));
            try {
                Message message = (Message) subscribe.receiveAsync().get(5L, TimeUnit.SECONDS);
                if (i >= 100) {
                    this.endExpectedTime = message.getPublishTime();
                } else {
                    arrayList.add(new PulsarMessage(message.getTopicName(), Long.valueOf(message.getPublishTime()), message));
                    if (i == 0) {
                        this.startTime = message.getPublishTime();
                    }
                }
            } catch (InterruptedException e) {
                LOG.error(e.getMessage());
            } catch (ExecutionException e2) {
                LOG.error(e2.getMessage());
            } catch (TimeoutException e3) {
                LOG.error(e3.getMessage());
            }
        }
        subscribe.close();
        create.close();
        client.close();
        return arrayList;
    }

    private static PulsarClient initClient() throws PulsarClientException {
        return PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
    }

    private static void setupPulsarContainer() {
        pulsarContainer = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.9.0"));
        pulsarContainer.withCommand(new String[]{"bin/pulsar", "standalone"});
        pulsarContainer.start();
    }

    @BeforeClass
    public static void setup() throws PulsarClientException {
        setupPulsarContainer();
        client = initClient();
    }

    @AfterClass
    public static void afterClass() {
        if (pulsarContainer != null) {
            pulsarContainer.stop();
        }
    }

    @Test
    public void testPulsarFunctionality() throws Exception {
        Consumer subscribe = client.newConsumer().topic(new String[]{TOPIC}).subscriptionName("PulsarIO_IT").subscribe();
        try {
            Producer create = client.newProducer().topic(TOPIC).create();
            Throwable th = null;
            try {
                try {
                    create.send("testing pulsar functionality".getBytes(StandardCharsets.UTF_8));
                    Assertions.assertEquals("testing pulsar functionality", new String(((Message) subscribe.receiveAsync().get(5L, TimeUnit.SECONDS)).getData(), StandardCharsets.UTF_8));
                    client.close();
                    if (create != null) {
                        $closeResource(null, create);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    $closeResource(th, create);
                }
                throw th3;
            }
        } finally {
            if (subscribe != null) {
                $closeResource(null, subscribe);
            }
        }
    }

    @Test
    public void testReadFromSimpleTopic() {
        try {
            List<PulsarMessage> produceMessages = produceMessages();
            this.testPipeline.apply(PulsarIO.read().withClientUrl(pulsarContainer.getPulsarBrokerUrl()).withAdminUrl(pulsarContainer.getHttpServiceUrl()).withTopic(TOPIC).withStartTimestamp(Long.valueOf(this.startTime)).withEndTimestamp(Long.valueOf(this.endExpectedTime)).withPublishTime()).apply(ParDo.of(new PulsarRecordsMetric()));
            long j = 0;
            Iterator it = this.testPipeline.run().metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(PulsarIOTest.class.getName(), "PulsarRecordsCounter")).build()).getCounters().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MetricResult metricResult = (MetricResult) it.next();
                if (metricResult.getName().toString().equals("org.apache.beam.sdk.io.pulsar.PulsarIOTest:PulsarRecordsCounter")) {
                    j = ((Long) metricResult.getAttempted()).longValue();
                    break;
                }
            }
            Assertions.assertEquals(produceMessages.size(), (int) j);
        } catch (PulsarClientException e) {
            LOG.error(e.getMessage());
        }
    }

    @Test
    public void testWriteFromTopic() {
        try {
            PulsarIO.Write withTopic = PulsarIO.write().withClientUrl(pulsarContainer.getPulsarBrokerUrl()).withTopic(TOPIC);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 100; i++) {
                arrayList.add(("PULSAR_WRITER_TEST_" + i).getBytes(StandardCharsets.UTF_8));
            }
            this.testPipeline.apply(Create.of(arrayList)).apply(withTopic);
            this.testPipeline.run();
            List<Message<byte[]>> receiveMessages = receiveMessages();
            Assertions.assertEquals(100, receiveMessages().size());
            for (int i2 = 0; i2 < 100; i2++) {
                Assertions.assertTrue(new String((byte[]) receiveMessages.get(i2).getValue(), StandardCharsets.UTF_8).equals("PULSAR_WRITER_TEST_" + i2));
            }
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
