package org.apache.camel.quarkus.component.nsq.it;

import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.NSQProducer;
import com.github.brainlag.nsq.exceptions.NSQException;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource(NsqTestResource.class)
/* loaded from: input_file:org/apache/camel/quarkus/component/nsq/it/NsqTest.class */
class NsqTest {
    private static final Logger LOG = Logger.getLogger(NsqRoute.class);
    private static final String TEST_CONSUMER_MSG = "Hello NSQConsumer !";
    private static final String TEST_PRODUCER_MSG = "Hello NSQProducer !";
    private static final String TEST_REQUEUE_MSG = "Test Requeue";
    private static String CONSUMER_HOST;
    private static String PRODUCER_HOST;
    private static int CONDUMER_PORT;
    private static int PRODUCER_PORT;

    @BeforeAll
    public static void setUp() {
        CONSUMER_HOST = (String) ConfigProvider.getConfig().getValue("quarkus.camel.nsq.test.consumer-host", String.class);
        CONDUMER_PORT = ((Integer) ConfigProvider.getConfig().getValue("quarkus.camel.nsq.test.consumer-port", Integer.TYPE)).intValue();
        PRODUCER_HOST = (String) ConfigProvider.getConfig().getValue("quarkus.camel.nsq.test.producer-host", String.class);
        PRODUCER_PORT = ((Integer) ConfigProvider.getConfig().getValue("quarkus.camel.nsq.test.producer-port", Integer.TYPE)).intValue();
        NsqLogger.log(LOG, "NsqTest.CONSUMER = %s:%s", new Object[]{CONSUMER_HOST, Integer.valueOf(CONDUMER_PORT)});
        NsqLogger.log(LOG, "NsqTest.PRODUCER = %s:%s", new Object[]{PRODUCER_HOST, Integer.valueOf(PRODUCER_PORT)});
    }

    @Test
    void nsqProducerShouldSucceed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RestAssured.given().body(TEST_PRODUCER_MSG).post("/nsq/send", new Object[0]).then().statusCode(204);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        DefaultNSQLookup defaultNSQLookup = new DefaultNSQLookup();
        defaultNSQLookup.addLookupAddress(CONSUMER_HOST, CONDUMER_PORT);
        NSQConsumer nSQConsumer = new NSQConsumer(defaultNSQLookup, "producer-topic", "testconsumer", nSQMessage -> {
            NsqLogger.log(LOG, "The NSQConsumer from testProducer() received message %s", new Object[]{nSQMessage});
            atomicInteger.incrementAndGet();
            nSQMessage.finished();
            countDownLatch.countDown();
            Assertions.assertEquals(TEST_PRODUCER_MSG, new String(nSQMessage.getMessage(), NsqRoute.MESSAGE_CHARSET));
        });
        try {
            nSQConsumer.start();
            countDownLatch.await(10L, TimeUnit.SECONDS);
            Assertions.assertEquals(1, atomicInteger.get());
            nSQConsumer.close();
        } catch (Throwable th) {
            try {
                nSQConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void nsqConsumerShouldSucceed() throws NSQException, TimeoutException {
        NSQProducer nSQProducer = new NSQProducer();
        nSQProducer.addAddress(PRODUCER_HOST, PRODUCER_PORT);
        nSQProducer.start();
        nSQProducer.produce("consumer-topic", TEST_CONSUMER_MSG.getBytes(NsqRoute.MESSAGE_CHARSET));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(RestAssured.given().get("/nsq/get-messages/testConsumer", new Object[0]).statusCode() == 200);
        });
        RestAssured.given().get("/nsq/get-messages/testConsumer", new Object[0]).then().body(CoreMatchers.is(TEST_CONSUMER_MSG), new Matcher[0]);
    }

    @Test
    void nsqConsumerWithExceptionShouldRequeueMessagesThreeTimes() throws NSQException, TimeoutException {
        NSQProducer nSQProducer = new NSQProducer();
        nSQProducer.addAddress(PRODUCER_HOST, PRODUCER_PORT);
        nSQProducer.start();
        nSQProducer.produce("consumer-topic", TEST_REQUEUE_MSG.getBytes(NsqRoute.MESSAGE_CHARSET));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(RestAssured.given().get("/nsq/get-messages/testRequeue", new Object[0]).statusCode() == 200);
        });
        RestAssured.given().get("/nsq/get-messages/testRequeue", new Object[0]).then().body(CoreMatchers.is(TEST_REQUEUE_MSG), new Matcher[0]);
    }
}
