/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.cli;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.tests.integration.containers.ChaosContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PerfToolTest
extends TopicMessagingBase {
    private static final int MESSAGE_COUNT = 50;

    @Test
    public void testProduce() throws Exception {
        String serviceUrl = "pulsar://" + this.pulsarCluster.getProxy().getContainerName() + ":" + 6650;
        String topicName = this.getNonPartitionedTopic("testProduce", true);
        ZKContainer clientToolContainer = this.pulsarCluster.getZooKeeper();
        ContainerExecResult produceResult = this.produceWithPerfTool(clientToolContainer, serviceUrl, topicName, 50);
        PerfToolTest.checkOutputForLogs(produceResult, "PerformanceProducer - Aggregated throughput stats", "PerformanceProducer - Aggregated latency stats");
    }

    @Test
    public void testConsume() throws Exception {
        String serviceUrl = "pulsar://" + this.pulsarCluster.getProxy().getContainerName() + ":" + 6650;
        String topicName = this.getNonPartitionedTopic("testConsume", true);
        ZKContainer clientToolContainer = this.pulsarCluster.getZooKeeper();
        ContainerExecResult consumeResult = this.consumeWithPerfTool(clientToolContainer, serviceUrl, topicName);
        PerfToolTest.checkOutputForLogs(consumeResult, "PerformanceConsumer - Aggregated throughput stats", "PerformanceConsumer - Aggregated latency stats");
    }

    @Test
    public void testRead() throws Exception {
        String serviceUrl = "pulsar://" + this.pulsarCluster.getProxy().getContainerName() + ":" + 6650;
        String topicName = this.getNonPartitionedTopic("testRead", true);
        ZKContainer clientToolContainer = this.pulsarCluster.getZooKeeper();
        ContainerExecResult readResult = this.readWithPerfTool(clientToolContainer, serviceUrl, topicName);
        PerfToolTest.checkOutputForLogs(readResult, "PerformanceReader - Aggregated throughput stats ", "PerformanceReader - Aggregated latency stats");
    }

    private ContainerExecResult produceWithPerfTool(ChaosContainer<?> container, String url, String topic, int messageCount) throws Exception {
        ContainerExecResult result = container.execCmd("bin/pulsar-perf", "produce", "-u", url, "-m", String.valueOf(messageCount), topic);
        return PerfToolTest.failOnError("Performance producer", result);
    }

    private ContainerExecResult consumeWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
        CompletableFuture<ContainerExecResult> resultFuture = container.execCmdAsync("bin/pulsar-perf", "consume", "-u", url, "-m", String.valueOf(50), topic);
        this.produceWithPerfTool(container, url, topic, 50);
        ContainerExecResult result = resultFuture.get(5L, TimeUnit.SECONDS);
        return PerfToolTest.failOnError("Performance consumer", result);
    }

    private ContainerExecResult readWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
        CompletableFuture<ContainerExecResult> resultFuture = container.execCmdAsync("bin/pulsar-perf", "read", "-u", url, "-n", String.valueOf(50), topic);
        this.produceWithPerfTool(container, url, topic, 50);
        ContainerExecResult result = resultFuture.get(5L, TimeUnit.SECONDS);
        return PerfToolTest.failOnError("Performance consumer", result);
    }

    private static ContainerExecResult failOnError(String processDesc, ContainerExecResult result) {
        if (result.getExitCode() != 0L) {
            Assert.fail((String)(processDesc + " failed. Command output:\n" + result.getStdout() + "\nError output:\n" + result.getStderr()));
        }
        return result;
    }

    private static void checkOutputForLogs(ContainerExecResult result, String ... logs) {
        String output = result.getStdout();
        for (String log : logs) {
            Assert.assertTrue((boolean)output.contains(log), (String)("command output did not contain log message '" + log + "'.\nFull stdout is:\n" + output));
        }
    }
}

