/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.tests.integration.containers.RabbitMQContainer;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;

public class RabbitMQSinkTester
extends SinkTester<RabbitMQContainer> {
    private final String exchangeName = "test-sink-exchange";
    private final String queueName = "test-sink-queue";
    private final String keyName = "test-key";

    public RabbitMQSinkTester(String networkAlias) {
        super(networkAlias, SinkTester.SinkType.RABBITMQ);
        this.sinkConfig.put("connectionName", "test-sink-connection");
        this.sinkConfig.put("host", networkAlias);
        this.sinkConfig.put("port", RabbitMQContainer.PORTS[0]);
        this.sinkConfig.put("queueName", "test-sink-queue");
        this.sinkConfig.put("exchangeName", "test-sink-exchange");
        this.sinkConfig.put("routingKey", "test-key");
    }

    @Override
    protected RabbitMQContainer createSinkService(PulsarCluster cluster) {
        return new RabbitMQContainer(cluster.getClusterName(), this.networkAlias);
    }

    @Override
    public void prepareSink() throws Exception {
    }

    static ConnectionFactory createConnectionFactory(RabbitMQContainer container) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(container.getHost());
        connectionFactory.setPort(container.getMappedPort(RabbitMQContainer.PORTS[0]).intValue());
        return connectionFactory;
    }

    @Override
    public void validateSinkResult(Map<String, String> kvs) {
        ConnectionFactory connectionFactory = RabbitMQSinkTester.createConnectionFactory((RabbitMQContainer)this.serviceContainer);
        try (Connection connection = connectionFactory.newConnection("rabbitmq-sink-tester");
             final Channel channel = connection.createChannel();){
            final LinkedBlockingQueue records = new LinkedBlockingQueue();
            channel.queueDeclare("test-sink-queue", true, false, false, null);
            channel.basicConsume("test-sink-queue", (Consumer)new DefaultConsumer(channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    records.add(new Record(envelope.getRoutingKey(), body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
            for (String value : kvs.values()) {
                try {
                    Record record = (Record)records.take();
                    Assert.assertEquals((String)record.key, (String)"test-key");
                    Assert.assertEquals((String)new String(record.body), (String)value);
                }
                catch (InterruptedException e) {
                    break;
                }
            }
        }
        catch (IOException | TimeoutException e) {
            Assert.fail((String)"RabbitMQ Sink test failed", (Throwable)e);
        }
    }

    @Override
    public void close() throws Exception {
    }

    private static class Record {
        private final String key;
        private final byte[] body;

        public Record(String key, byte[] body) {
            this.key = key;
            this.body = body;
        }

        public String getKey() {
            return this.key;
        }

        public byte[] getBody() {
            return this.body;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Record)) {
                return false;
            }
            Record other = (Record)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$key = this.getKey();
            String other$key = other.getKey();
            if (this$key == null ? other$key != null : !this$key.equals(other$key)) {
                return false;
            }
            return Arrays.equals(this.getBody(), other.getBody());
        }

        protected boolean canEqual(Object other) {
            return other instanceof Record;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $key = this.getKey();
            result = result * 59 + ($key == null ? 43 : $key.hashCode());
            result = result * 59 + Arrays.hashCode(this.getBody());
            return result;
        }

        public String toString() {
            return "RabbitMQSinkTester.Record(key=" + this.getKey() + ", body=" + Arrays.toString(this.getBody()) + ")";
        }
    }
}

