/*
 * Decompiled with CFR 0.152.
 */
package net.leanix.dropkit.amqp;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.leanix.dropkit.amqp.ConnectionHolder;
import net.leanix.dropkit.amqp.ConsumerRegistry;
import net.leanix.dropkit.amqp.QueueConsumer;
import net.leanix.dropkit.amqp.QueueConsumerFactory;
import net.leanix.dropkit.amqp.QueueProducer;
import net.leanix.dropkit.amqp.testsupport.RabbitMQEnvironment;
import org.apache.commons.io.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

@Ignore(value="Currently deactivated due to problems when running on ci machine and long running time.")
public class QueueProducerTest {
    private static final String QUEUE_NAME = "queueName";
    @ClassRule
    public static RabbitMQEnvironment rabbitMqEnvironment = new RabbitMQEnvironment();
    private QueueProducer queueProducer;
    private ConnectionHolder connectionHolder;
    private ConsumerRegistry consumerRegistry;
    private CountDownLatch queueConsumerIsCalled = new CountDownLatch(2);
    String testPayload;

    @Before
    public void setUp() throws IOException {
        this.testPayload = FileUtils.readFileToString((File)new File("src/test/resources/payloadtest.txt"));
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(QueueProducerTest.rabbitMqEnvironment.getRabbitMQFacade().host);
        connectionFactory.setPort(QueueProducerTest.rabbitMqEnvironment.getRabbitMQFacade().port);
        this.connectionHolder = new ConnectionHolder(connectionFactory);
        this.consumerRegistry = new ConsumerRegistry(this.connectionHolder, new QueueConsumerFactory(){

            public QueueConsumer createConsumer(String queueName, Channel channel, ConsumerRegistry registry) {
                return new QueueConsumer(queueName, channel, registry){

                    public void simpleHandle(String body) throws IOException {
                        QueueProducerTest.this.queueConsumerIsCalled.countDown();
                    }
                };
            }
        });
        this.queueProducer = new QueueProducer(this.connectionHolder, this.consumerRegistry);
    }

    @After
    public void tearDown() {
        this.consumerRegistry.removeOldConsumers(0L);
        this.connectionHolder.closeConnection();
    }

    @Test
    public void test_submit_oneSubmitOnly() throws IOException, InterruptedException {
        String payload = FileUtils.readFileToString((File)new File("src/test/resources/payloadtest.txt"));
        UUID submit = this.queueProducer.submit(payload, QUEUE_NAME);
        this.queueConsumerIsCalled.countDown();
        this.queueConsumerIsCalled.await(5L, TimeUnit.SECONDS);
        Assertions.assertThat((Comparable)submit).isNotNull();
        this.consumerRegistry.removeOldConsumers(0L);
    }

    @Test
    public void test_submit_twoSubmitsWithSameQueueNameOnSameConsumerRegistry() throws IOException, InterruptedException {
        String payload = FileUtils.readFileToString((File)new File("src/test/resources/payloadtest.txt"));
        UUID submit = this.queueProducer.submit(payload, QUEUE_NAME);
        UUID submit2 = this.queueProducer.submit(payload, QUEUE_NAME);
        this.queueConsumerIsCalled.await(5L, TimeUnit.SECONDS);
        Assertions.assertThat((Comparable)submit).isNotNull();
        Assertions.assertThat((Comparable)submit2).isNotNull();
        Assertions.assertThat((Comparable)submit).isNotEqualTo((Object)submit2);
        this.consumerRegistry.removeOldConsumers(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void test_submit_twoSubmitsWithSameQueueNameOnDifferentConsumerRegistries() throws IOException, InterruptedException {
        String payload = FileUtils.readFileToString((File)new File("src/test/resources/payloadtest.txt"));
        ConsumerRegistry consumerRegistryBlue = new ConsumerRegistry(this.connectionHolder, new QueueConsumerFactory(){

            public QueueConsumer createConsumer(String queueName, Channel channel, ConsumerRegistry registry) {
                return new QueueConsumer(queueName, channel, registry){

                    public void simpleHandle(String body) throws IOException {
                        QueueProducerTest.this.queueConsumerIsCalled.countDown();
                    }
                };
            }
        });
        QueueProducer queueProducerBlue = new QueueProducer(this.connectionHolder, consumerRegistryBlue);
        ConsumerRegistry consumerRegistryGreen = new ConsumerRegistry(this.connectionHolder, new QueueConsumerFactory(){

            public QueueConsumer createConsumer(String queueName, Channel channel, ConsumerRegistry registry) {
                return new QueueConsumer(queueName, channel, registry){

                    public void simpleHandle(String body) throws IOException {
                        QueueProducerTest.this.queueConsumerIsCalled.countDown();
                    }
                };
            }
        });
        QueueProducer queueProducerGreen = new QueueProducer(this.connectionHolder, consumerRegistryGreen);
        UUID submit = queueProducerBlue.submit(payload, QUEUE_NAME);
        Assertions.assertThat((Comparable)submit).isNotNull();
        try {
            queueProducerGreen.submit(payload, QUEUE_NAME);
        }
        catch (Exception e) {
            System.out.println("got expected expected exception");
            return;
        }
        finally {
            this.consumerRegistry.removeOldConsumers(0L);
        }
        throw new RuntimeException();
    }

    @Test
    public void test_submit_WithDifferentThreads_sameProducer() throws IOException, InterruptedException, ExecutionException {
        ListeningScheduledExecutorService executor = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newScheduledThreadPool(20));
        ArrayList<ListenableFuture> allFutures = new ArrayList<ListenableFuture>();
        int NUMBER_OF_MESSAGES = 2000;
        for (int i = 0; i < 2000; ++i) {
            ListenableFuture submitFuture = executor.submit((Callable)new Callable<UUID>(){

                @Override
                public UUID call() throws Exception {
                    return QueueProducerTest.this.queueProducer.submit(QueueProducerTest.this.testPayload, QueueProducerTest.QUEUE_NAME);
                }
            });
            allFutures.add(submitFuture);
        }
        ListenableFuture resultFuture = Futures.allAsList(allFutures);
        List generatedUUIDs = (List)resultFuture.get();
        System.out.println("Created " + generatedUUIDs.size() + " uuids.");
        this.consumerRegistry.removeOldConsumers(0L);
        Assertions.assertThat((int)generatedUUIDs.size()).isEqualTo(2000);
    }

    @Test
    public void test_submit_WithDifferentThreads_differentProducer() throws IOException, InterruptedException, ExecutionException {
        ListeningScheduledExecutorService executor = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newScheduledThreadPool(20));
        ArrayList<ListenableFuture> allFutures = new ArrayList<ListenableFuture>();
        int NUMBER_OF_MESSAGES = 5000;
        for (int i = 0; i < 5000; ++i) {
            final QueueProducer myQueueProducer = new QueueProducer(this.connectionHolder, this.consumerRegistry);
            ListenableFuture submitFuture = executor.submit((Callable)new Callable<UUID>(){

                @Override
                public UUID call() throws Exception {
                    return myQueueProducer.submit(QueueProducerTest.this.testPayload, QueueProducerTest.QUEUE_NAME);
                }
            });
            allFutures.add(submitFuture);
        }
        ListenableFuture resultFuture = Futures.allAsList(allFutures);
        List generatedUUIDs = (List)resultFuture.get();
        System.out.println("Created " + generatedUUIDs.size() + " uuids.");
        this.consumerRegistry.removeOldConsumers(0L);
        Assertions.assertThat((int)generatedUUIDs.size()).isEqualTo(5000);
    }
}

