/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.plugins;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestBrokerInterceptors
extends TopicMessagingBase {
    private static final String PREFIX = "PULSAR_PREFIX_";

    @Override
    public void setupCluster() throws Exception {
        this.brokerEnvs.put("PULSAR_PREFIX_disableBrokerInterceptors", "false");
        this.brokerEnvs.put("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/examples");
        this.brokerEnvs.put("PULSAR_PREFIX_brokerInterceptors", "loggingInterceptor");
        super.setupCluster();
    }

    @Override
    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
        specBuilder.numBrokers(1);
        return specBuilder;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ServiceUrls")
    public void test(Supplier<String> serviceUrlSupplier) throws Exception {
        String serviceUrl = serviceUrlSupplier.get();
        String topicName = this.getNonPartitionedTopic("interceptorTest-topic", true);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try {
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("producer").create();
            try {
                int messagesToSend = 20;
                for (int i = 0; i < messagesToSend; ++i) {
                    String messageValue = producer.getProducerName() + "-" + i;
                    MessageId messageId = producer.newMessage().value((Object)messageValue).send();
                    Assert.assertNotNull((Object)messageId);
                }
                try (Consumer<String> consumer = this.createConsumer(client, topicName);){
                    for (int i = 0; i < messagesToSend; ++i) {
                        consumer.receive(3, TimeUnit.SECONDS);
                    }
                }
                String log = this.pulsarCluster.getAnyBroker().execCmd("cat", "/var/log/pulsar/broker.log").getStdout();
                for (String line : new String[]{"initialize: OK", "onConnectionCreated", "producerCreated", "consumerCreated", "messageProduced", "beforeSendMessage: OK"}) {
                    Assert.assertTrue((boolean)log.contains("LoggingBrokerInterceptor - " + line), (String)("Log did not contain line '" + line + "'"));
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private Consumer<String> createConsumer(PulsarClient client, String topicName) throws Exception {
        ConsumerBuilder builder = client.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName(TestBrokerInterceptors.randomName(8)).subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
        return builder.subscribe();
    }
}

