package org.apache.beam.sdk.io.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.repackaged.beam_sdks_java_io_rabbitmq.com.google.common.base.Ascii;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.class */
public class RabbitMqIOTest implements Serializable {
    private static int port;

    @Rule
    public transient TestPipeline p = TestPipeline.create();
    private static transient Broker broker;
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMqIOTest.class);

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest$TestConsumer.class */
    private static class TestConsumer extends DefaultConsumer {
        private final List<String> received;

        public TestConsumer(Channel channel, List<String> list) {
            super(channel);
            this.received = list;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            this.received.add(new String(bArr, "UTF-8"));
        }
    }

    @BeforeClass
    public static void startBroker() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            port = serverSocket.getLocalPort();
            if (0 != 0) {
                try {
                    serverSocket.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } else {
                serverSocket.close();
            }
            System.setProperty("derby.stream.error.field", "MyApp.DEV_NULL");
            broker = new Broker();
            BrokerOptions brokerOptions = new BrokerOptions();
            brokerOptions.setConfigProperty("qpid.amqp_port", String.valueOf(port));
            brokerOptions.setConfigProperty("qpid.work_dir", temporaryFolder.newFolder().toString());
            brokerOptions.setConfigProperty("qpid.home_dir", "src/test/qpid");
            broker.startup(brokerOptions);
        } catch (Throwable th3) {
            if (0 != 0) {
                try {
                    serverSocket.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                serverSocket.close();
            }
            throw th3;
        }
    }

    @AfterClass
    public static void stopBroker() {
        broker.shutdown();
    }

    @Test
    public void testReadQueue() throws Exception {
        PCollection apply = this.p.apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:" + port).withQueue("READ").withMaxNumRecords(10L)).apply(MapElements.into(TypeDescriptors.strings()).via(rabbitMqMessage -> {
            return new String(rabbitMqMessage.getBody(), StandardCharsets.UTF_8);
        }));
        List list = (List) generateRecords(10).stream().map(bArr -> {
            return new String(bArr, StandardCharsets.UTF_8);
        }).collect(Collectors.toList());
        PAssert.that(apply).containsInAnyOrder(list);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("READ", false, false, false, (Map) null);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                channel.basicPublish("", "READ", (AMQP.BasicProperties) null, ((String) it.next()).getBytes(StandardCharsets.UTF_8));
            }
            this.p.run();
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testReadExchange() throws Exception {
        PAssert.that(this.p.apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:" + port).withExchange("READEXCHANGE", "fanout", "test").withMaxNumRecords(10L)).apply(MapElements.into(TypeDescriptors.strings()).via(rabbitMqMessage -> {
            return new String(rabbitMqMessage.getBody(), StandardCharsets.UTF_8);
        }))).containsInAnyOrder((List) generateRecords(10).stream().map(bArr -> {
            return new String(bArr, StandardCharsets.UTF_8);
        }).collect(Collectors.toList()));
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("READEXCHANGE", "fanout");
            Thread thread = new Thread(() -> {
                try {
                    Thread.sleep(5000L);
                } catch (Exception e) {
                    LOG.error(e.getMessage(), e);
                }
                for (int i = 0; i < 10; i++) {
                    try {
                        channel.basicPublish("READEXCHANGE", "test", (AMQP.BasicProperties) null, ("Test " + i).getBytes(StandardCharsets.UTF_8));
                    } catch (Exception e2) {
                        LOG.error(e2.getMessage(), e2);
                    }
                }
            });
            thread.start();
            this.p.run();
            thread.join();
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testWriteQueue() throws Exception {
        this.p.apply(Create.of((List) generateRecords(1000).stream().map(bArr -> {
            return new RabbitMqMessage(bArr);
        }).collect(Collectors.toList()))).apply(RabbitMqIO.write().withUri("amqp://guest:guest@localhost:" + port).withQueue("TEST"));
        ArrayList arrayList = new ArrayList();
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("TEST", true, false, false, (Map) null);
            channel.basicConsume("TEST", true, new TestConsumer(channel, arrayList));
            this.p.run();
            while (arrayList.size() < 1000) {
                Thread.sleep(500L);
            }
            Assert.assertEquals(1000L, arrayList.size());
            for (int i = 0; i < 1000; i++) {
                Assert.assertTrue(arrayList.contains("Test " + i));
            }
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testWriteExchange() throws Exception {
        this.p.apply(Create.of((List) generateRecords(1000).stream().map(bArr -> {
            return new RabbitMqMessage(bArr);
        }).collect(Collectors.toList()))).apply(RabbitMqIO.write().withUri("amqp://guest:guest@localhost:" + port).withExchange("WRITE", "fanout"));
        ArrayList arrayList = new ArrayList();
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:" + port);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("WRITE", "fanout");
            String queue = channel.queueDeclare().getQueue();
            channel.queueBind(queue, "WRITE", "");
            channel.basicConsume(queue, true, new TestConsumer(channel, arrayList));
            this.p.run();
            while (arrayList.size() < 1000) {
                Thread.sleep(500L);
            }
            Assert.assertEquals(1000L, arrayList.size());
            for (int i = 0; i < 1000; i++) {
                Assert.assertTrue(arrayList.contains("Test " + i));
            }
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private static List<byte[]> generateRecords(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return ("Test " + i2).getBytes(StandardCharsets.UTF_8);
        }).collect(Collectors.toList());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -736709031:
                if (implMethodName.equals("lambda$testReadQueue$2f9baff8$1")) {
                    z = true;
                    break;
                }
                break;
            case -478507875:
                if (implMethodName.equals("lambda$testReadExchange$2f9baff8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/rabbitmq/RabbitMqMessage;)Ljava/lang/String;")) {
                    return rabbitMqMessage -> {
                        return new String(rabbitMqMessage.getBody(), StandardCharsets.UTF_8);
                    };
                }
                break;
            case Ascii.SOH /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/rabbitmq/RabbitMqMessage;)Ljava/lang/String;")) {
                    return rabbitMqMessage2 -> {
                        return new String(rabbitMqMessage2.getBody(), StandardCharsets.UTF_8);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
