package org.apache.beam.sdk.io.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqTestUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.qpid.server.SystemLauncher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.class */
public class RabbitMqIOTest implements Serializable {
    private static int port;
    private static String defaultPort;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @ClassRule
    public static Timeout classTimeout = new Timeout(5, TimeUnit.MINUTES);

    @Rule
    public transient TestPipeline p = TestPipeline.create();
    private static transient SystemLauncher launcher;

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        defaultPort = System.getProperty("qpid.amqp_port");
        System.setProperty("qpid.amqp_port", "" + port);
        System.setProperty("derby.stream.error.field", "MyApp.DEV_NULL");
        launcher = new SystemLauncher();
        HashMap hashMap = new HashMap();
        URL resource = RabbitMqIOTest.class.getResource("rabbitmq-io-test-config.json");
        hashMap.put("type", "Memory");
        hashMap.put("initialConfigurationLocation", resource.toExternalForm());
        hashMap.put("${QPID_WORK}", temporaryFolder.newFolder().toString());
        launcher.startup(hashMap);
    }

    @AfterClass
    public static void afterClass() {
        launcher.shutdown();
        if (defaultPort != null) {
            System.setProperty("qpid.amqp_port", defaultPort);
        } else {
            System.clearProperty("qpid.amqp_port");
        }
    }

    @Test
    public void testReadQueue() throws Exception {
        PCollection apply = this.p.apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:" + port).withQueue("READ").withMaxNumRecords(10L)).apply(MapElements.into(TypeDescriptors.strings()).via(rabbitMqMessage -> {
            return RabbitMqTestUtils.recordToString(rabbitMqMessage.getBody());
        }));
        List list = (List) RabbitMqTestUtils.generateRecords(10).stream().map(RabbitMqTestUtils::recordToString).collect(Collectors.toList());
        PAssert.that(apply).containsInAnyOrder(list);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("READ", false, false, false, (Map) null);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                channel.basicPublish("", "READ", (AMQP.BasicProperties) null, ((String) it.next()).getBytes(StandardCharsets.UTF_8));
            }
            this.p.run();
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private void doExchangeTest(ExchangeTestPlan exchangeTestPlan, boolean z) throws Exception {
        byte[] bArr = new byte[0];
        String str = "amqp://guest:guest@localhost:" + port;
        RabbitMqIO.Read read = exchangeTestPlan.getRead();
        PAssert.that(this.p.apply(read.withUri(str).withMaxNumRecords(exchangeTestPlan.getNumRecords())).apply(MapElements.into(TypeDescriptors.strings()).via(rabbitMqMessage -> {
            return RabbitMqTestUtils.recordToString(rabbitMqMessage.getBody());
        }))).containsInAnyOrder(exchangeTestPlan.expectedResults());
        String str2 = (String) Optional.ofNullable(read.exchange()).orElseGet(() -> {
            return UUID.randomUUID().toString();
        });
        String str3 = (String) Optional.ofNullable(read.exchangeType()).orElse("fanout");
        if (z) {
            str3 = "fanout".equalsIgnoreCase(str3) ? "direct" : "fanout";
        }
        String str4 = str3;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        linkedBlockingQueue.addAll(RabbitMqTestUtils.generateRecords(exchangeTestPlan.getNumRecordsToPublish()));
        Thread thread = new Thread(() -> {
            Connection connection = null;
            Channel channel = null;
            try {
                try {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setAutomaticRecoveryEnabled(false);
                    connectionFactory.setUri(str);
                    connection = connectionFactory.newConnection();
                    channel = connection.createChannel();
                    channel.exchangeDeclare(str2, str4);
                    channel.addReturnListener((i, str5, str6, str7, basicProperties, bArr2) -> {
                        try {
                            linkedBlockingQueue.put(bArr2);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                    countDownLatch.countDown();
                    while (true) {
                        byte[] bArr3 = (byte[]) linkedBlockingQueue.take();
                        if (bArr3 == bArr) {
                            break;
                        } else {
                            channel.basicPublish(str2, exchangeTestPlan.publishRoutingKeyGen().get(), true, exchangeTestPlan.getPublishProperties(), bArr3);
                        }
                    }
                    if (channel != null) {
                        try {
                            channel.close();
                        } catch (Exception e) {
                        }
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Exception e2) {
                        }
                    }
                } catch (Exception e3) {
                    throw new RuntimeException(e3);
                }
            } catch (Throwable th) {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (Exception e4) {
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception e5) {
                    }
                }
                throw th;
            }
        });
        thread.start();
        countDownLatch.await();
        this.p.run();
        linkedBlockingQueue.put(bArr);
        thread.join();
    }

    private void doExchangeTest(ExchangeTestPlan exchangeTestPlan) throws Exception {
        doExchangeTest(exchangeTestPlan, false);
    }

    @Test
    public void testReadDeclaredFanoutExchange() throws Exception {
        doExchangeTest(new ExchangeTestPlan(RabbitMqIO.read().withExchange("DeclaredFanoutExchange", "fanout", "ignored"), 10));
    }

    @Test
    public void testReadDeclaredTopicExchangeWithQueueDeclare() throws Exception {
        doExchangeTest(new ExchangeTestPlan(RabbitMqIO.read().withExchange("DeclaredTopicExchangeWithQueueDeclare", "topic", "#").withQueue("declared-queue").withQueueDeclare(true), 10));
    }

    @Test
    public void testReadDeclaredTopicExchange() throws Exception {
        RabbitMqIO.Read withExchange = RabbitMqIO.read().withExchange("DeclaredTopicExchange", "topic", "user.create.#");
        final Supplier<String> supplier = new Supplier<String>() { // from class: org.apache.beam.sdk.io.rabbitmq.RabbitMqIOTest.1
            private AtomicInteger counter = new AtomicInteger(0);

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.function.Supplier
            public String get() {
                int andIncrement = this.counter.getAndIncrement();
                return andIncrement % 2 == 0 ? "user.create." + andIncrement : "user.delete." + andIncrement;
            }
        };
        doExchangeTest(new ExchangeTestPlan(withExchange, 5, 10) { // from class: org.apache.beam.sdk.io.rabbitmq.RabbitMqIOTest.2
            @Override // org.apache.beam.sdk.io.rabbitmq.ExchangeTestPlan
            public Supplier<String> publishRoutingKeyGen() {
                return supplier;
            }

            @Override // org.apache.beam.sdk.io.rabbitmq.ExchangeTestPlan
            public List<String> expectedResults() {
                return (List) IntStream.range(0, 10).filter(i -> {
                    return i % 2 == 0;
                }).mapToObj(RabbitMqTestUtils::generateRecord).map(RabbitMqTestUtils::recordToString).collect(Collectors.toList());
            }
        });
    }

    @Test
    public void testDeclareIncompatibleExchangeFails() throws Exception {
        try {
            doExchangeTest(new ExchangeTestPlan(RabbitMqIO.read().withExchange("IncompatibleExchange", "direct", "unused"), 1), true);
            Assert.fail("Expected to have failed to declare an incompatible exchange");
        } catch (Exception e) {
            ShutdownSignalException rootCause = Throwables.getRootCause(e);
            if (!(rootCause instanceof ShutdownSignalException)) {
                Assert.fail("Expected to fail with ShutdownSignalException. Instead failed with " + rootCause);
                return;
            }
            AMQP.Connection.Close reason = rootCause.getReason();
            if (reason instanceof AMQP.Connection.Close) {
                Assert.assertEquals("Expected failure is 530: not-allowed", 530L, reason.getReplyCode());
            } else {
                Assert.fail("Unexpected ShutdownSignalException reason. Expected Connection.Close. Got: " + reason);
            }
        }
    }

    @Test
    public void testUseCorrelationIdSucceedsWhenIdsPresent() throws Exception {
        doExchangeTest(new ExchangeTestPlan(RabbitMqIO.read().withExchange("CorrelationIdSuccess", "fanout").withUseCorrelationId(true), 1, 1, new AMQP.BasicProperties().builder().correlationId("123").build()));
    }

    @Test(expected = Pipeline.PipelineExecutionException.class)
    public void testUseCorrelationIdFailsWhenIdsMissing() throws Exception {
        doExchangeTest(new ExchangeTestPlan(RabbitMqIO.read().withExchange("CorrelationIdFailure", "fanout").withUseCorrelationId(true), 1, 1, null));
    }

    @Test(expected = Pipeline.PipelineExecutionException.class)
    public void testQueueDeclareWithoutQueueNameFails() throws Exception {
        doExchangeTest(new ExchangeTestPlan(RabbitMqIO.read().withQueueDeclare(true), 1));
    }

    @Test
    public void testWriteQueue() throws Exception {
        this.p.apply(Create.of((List) RabbitMqTestUtils.generateRecords(1000).stream().map(RabbitMqMessage::new).collect(Collectors.toList()))).apply(RabbitMqIO.write().withUri("amqp://guest:guest@localhost:" + port).withQueue("TEST"));
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("TEST", true, false, false, (Map) null);
            RabbitMqTestUtils.TestConsumer testConsumer = new RabbitMqTestUtils.TestConsumer(channel);
            channel.basicConsume("TEST", true, testConsumer);
            this.p.run();
            while (testConsumer.getReceived().size() < 1000) {
                Thread.sleep(500L);
            }
            Assert.assertEquals(1000L, testConsumer.getReceived().size());
            for (int i = 0; i < 1000; i++) {
                Assert.assertTrue(testConsumer.getReceived().contains("Test " + i));
            }
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testWriteExchange() throws Exception {
        this.p.apply(Create.of((List) RabbitMqTestUtils.generateRecords(1000).stream().map(RabbitMqMessage::new).collect(Collectors.toList()))).apply(RabbitMqIO.write().withUri("amqp://guest:guest@localhost:" + port).withExchange("WRITE", "fanout"));
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("WRITE", "fanout");
            String queue = channel.queueDeclare().getQueue();
            channel.queueBind(queue, "WRITE", "");
            RabbitMqTestUtils.TestConsumer testConsumer = new RabbitMqTestUtils.TestConsumer(channel);
            channel.basicConsume(queue, true, testConsumer);
            this.p.run();
            while (testConsumer.getReceived().size() < 1000) {
                Thread.sleep(500L);
            }
            Assert.assertEquals(1000L, testConsumer.getReceived().size());
            for (int i = 0; i < 1000; i++) {
                Assert.assertTrue(testConsumer.getReceived().contains("Test " + i));
            }
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -736709031:
                if (implMethodName.equals("lambda$testReadQueue$2f9baff8$1")) {
                    z = true;
                    break;
                }
                break;
            case 326179606:
                if (implMethodName.equals("lambda$doExchangeTest$26545d3c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/rabbitmq/RabbitMqMessage;)Ljava/lang/String;")) {
                    return rabbitMqMessage -> {
                        return RabbitMqTestUtils.recordToString(rabbitMqMessage.getBody());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/rabbitmq/RabbitMqMessage;)Ljava/lang/String;")) {
                    return rabbitMqMessage2 -> {
                        return RabbitMqTestUtils.recordToString(rabbitMqMessage2.getBody());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
