/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.pulsar;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.pulsar.PulsarIO;
import org.apache.beam.sdk.io.pulsar.PulsarMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=JUnit4.class)
public class PulsarIOTest {
    private static final String TOPIC = "PULSAR_IO_TEST";
    protected static PulsarContainer pulsarContainer;
    protected static PulsarClient client;
    private long endExpectedTime = 0L;
    private long startTime = 0L;
    private static final Logger LOG;
    @Rule
    public final transient TestPipeline testPipeline = TestPipeline.create();

    public List<Message<byte[]>> receiveMessages() throws PulsarClientException {
        if (client == null) {
            PulsarIOTest.initClient();
        }
        ArrayList<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
        Consumer consumer = client.newConsumer().topic(new String[]{TOPIC}).subscriptionName("receiveMockMessageFn").subscribe();
        while (consumer.hasReachedEndOfTopic()) {
            Message msg = consumer.receive();
            messages.add((Message<byte[]>)msg);
            try {
                consumer.acknowledge(msg);
            }
            catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
        return messages;
    }

    public List<PulsarMessage> produceMessages() throws PulsarClientException {
        client = PulsarIOTest.initClient();
        Producer producer = client.newProducer().topic(TOPIC).create();
        Consumer consumer = client.newConsumer().topic(new String[]{TOPIC}).subscriptionName("produceMockMessageFn").subscribe();
        int numElements = 101;
        ArrayList<PulsarMessage> inputs = new ArrayList<PulsarMessage>();
        for (int i = 0; i < numElements; ++i) {
            String msg = "PULSAR_TEST_READFROMSIMPLETOPIC_" + i;
            producer.send((Object)msg.getBytes(StandardCharsets.UTF_8));
            CompletableFuture future = consumer.receiveAsync();
            Message message = null;
            try {
                message = (Message)future.get(5L, TimeUnit.SECONDS);
                if (i >= 100) {
                    this.endExpectedTime = message.getPublishTime();
                    continue;
                }
                inputs.add(new PulsarMessage(message.getTopicName(), Long.valueOf(message.getPublishTime()), (Object)message));
                if (i != 0) continue;
                this.startTime = message.getPublishTime();
                continue;
            }
            catch (InterruptedException e) {
                LOG.error(e.getMessage());
                continue;
            }
            catch (ExecutionException e) {
                LOG.error(e.getMessage());
                continue;
            }
            catch (TimeoutException e) {
                LOG.error(e.getMessage());
            }
        }
        consumer.close();
        producer.close();
        client.close();
        return inputs;
    }

    private static PulsarClient initClient() throws PulsarClientException {
        return PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
    }

    private static void setupPulsarContainer() {
        pulsarContainer = new PulsarContainer(DockerImageName.parse((String)"apachepulsar/pulsar:2.9.0"));
        pulsarContainer.withCommand(new String[]{"bin/pulsar", "standalone"});
        pulsarContainer.start();
    }

    @BeforeClass
    public static void setup() throws PulsarClientException {
        PulsarIOTest.setupPulsarContainer();
        client = PulsarIOTest.initClient();
    }

    @AfterClass
    public static void afterClass() {
        if (pulsarContainer != null) {
            pulsarContainer.stop();
        }
    }

    @Test
    public void testPulsarFunctionality() throws Exception {
        try (Consumer consumer = client.newConsumer().topic(new String[]{TOPIC}).subscriptionName("PulsarIO_IT").subscribe();
             Producer producer = client.newProducer().topic(TOPIC).create();){
            String messageTxt = "testing pulsar functionality";
            producer.send((Object)messageTxt.getBytes(StandardCharsets.UTF_8));
            CompletableFuture future = consumer.receiveAsync();
            Message message = (Message)future.get(5L, TimeUnit.SECONDS);
            Assertions.assertEquals((Object)messageTxt, (Object)new String(message.getData(), StandardCharsets.UTF_8));
            client.close();
        }
    }

    @Test
    public void testReadFromSimpleTopic() {
        try {
            List<PulsarMessage> inputsMock = this.produceMessages();
            PulsarIO.Read reader = PulsarIO.read().withClientUrl(pulsarContainer.getPulsarBrokerUrl()).withAdminUrl(pulsarContainer.getHttpServiceUrl()).withTopic(TOPIC).withStartTimestamp(Long.valueOf(this.startTime)).withEndTimestamp(Long.valueOf(this.endExpectedTime)).withPublishTime();
            ((PCollection)this.testPipeline.apply((PTransform)reader)).apply((PTransform)ParDo.of((DoFn)new PulsarRecordsMetric()));
            PipelineResult pipelineResult = this.testPipeline.run();
            MetricQueryResults metrics = pipelineResult.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((String)PulsarIOTest.class.getName(), (String)"PulsarRecordsCounter")).build());
            long recordsCount = 0L;
            for (MetricResult metric : metrics.getCounters()) {
                if (!metric.getName().toString().equals("org.apache.beam.sdk.io.pulsar.PulsarIOTest:PulsarRecordsCounter")) continue;
                recordsCount = (Long)metric.getAttempted();
                break;
            }
            Assertions.assertEquals((int)inputsMock.size(), (int)((int)recordsCount));
        }
        catch (PulsarClientException e) {
            LOG.error(e.getMessage());
        }
    }

    @Test
    public void testWriteFromTopic() {
        try {
            PulsarIO.Write writer = PulsarIO.write().withClientUrl(pulsarContainer.getPulsarBrokerUrl()).withTopic(TOPIC);
            int numberOfMessages = 100;
            ArrayList<byte[]> messages = new ArrayList<byte[]>();
            for (int i = 0; i < numberOfMessages; ++i) {
                messages.add(("PULSAR_WRITER_TEST_" + i).getBytes(StandardCharsets.UTF_8));
            }
            ((PCollection)this.testPipeline.apply((PTransform)Create.of(messages))).apply((PTransform)writer);
            this.testPipeline.run();
            List<Message<byte[]>> receiveMsgs = this.receiveMessages();
            Assertions.assertEquals((int)numberOfMessages, (int)this.receiveMessages().size());
            for (int i = 0; i < numberOfMessages; ++i) {
                Assertions.assertTrue((boolean)new String((byte[])receiveMsgs.get(i).getValue(), StandardCharsets.UTF_8).equals("PULSAR_WRITER_TEST_" + i));
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage());
        }
    }

    static {
        LOG = LoggerFactory.getLogger(PulsarIOTest.class);
    }

    public static class PulsarRecordsMetric
    extends DoFn<PulsarMessage, PulsarMessage> {
        private final Counter counter = Metrics.counter((String)PulsarIOTest.class.getName(), (String)"PulsarRecordsCounter");

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            this.counter.inc();
            context.output((Object)((PulsarMessage)context.element()));
        }
    }
}

