/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.pulsar.tests.integration.containers.RabbitMQContainer;
import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;

public class RabbitMQSourceTester
extends SourceTester<RabbitMQContainer> {
    private RabbitMQContainer serviceContainer;
    private final String exchangeName = "test-src-exchange";
    private final String queueName = "test-src-queue";

    public RabbitMQSourceTester(String networkAlias) {
        super("rabbitmq");
        this.sourceConfig.put("connectionName", "test-source-connection");
        this.sourceConfig.put("host", networkAlias);
        this.sourceConfig.put("port", RabbitMQContainer.PORTS[0]);
        this.sourceConfig.put("queueName", "test-src-queue");
    }

    @Override
    public void setServiceContainer(RabbitMQContainer serviceContainer) {
        this.serviceContainer = serviceContainer;
    }

    @Override
    public void prepareSource() throws Exception {
    }

    @Override
    public void prepareInsertEvent() throws Exception {
    }

    @Override
    public void prepareDeleteEvent() throws Exception {
    }

    @Override
    public void prepareUpdateEvent() throws Exception {
    }

    @Override
    public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
        ConnectionFactory connectionFactory = RabbitMQSinkTester.createConnectionFactory(this.serviceContainer);
        try (Connection connection = connectionFactory.newConnection("rabbitmq-source-tester");){
            LinkedHashMap<String, String> linkedHashMap;
            block13: {
                Channel channel = connection.createChannel();
                try {
                    channel.queueDeclare("test-src-queue", false, false, false, null);
                    channel.exchangeDeclare("test-src-exchange", BuiltinExchangeType.TOPIC);
                    channel.queueBind("test-src-queue", "test-src-exchange", "#");
                    LinkedHashMap<String, String> values = new LinkedHashMap<String, String>();
                    for (int i = 0; i < numMessages; ++i) {
                        String key = "rb-key-" + i;
                        String value = "rb-value-" + i;
                        values.put(key, value);
                        channel.basicPublish("test-src-exchange", key, null, value.getBytes());
                    }
                    linkedHashMap = values;
                    if (channel == null) break block13;
                }
                catch (Throwable throwable) {
                    if (channel != null) {
                        try {
                            channel.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                channel.close();
            }
            return linkedHashMap;
        }
    }
}

