/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.functions;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PulsarStateTest
extends PulsarStandaloneTestSuite {
    private static final Logger log = LoggerFactory.getLogger(PulsarStateTest.class);
    public static final String WORDCOUNT_PYTHON_CLASS = "wordcount_function.WordCountFunction";
    public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py";

    @Test(groups={"python_state", "state", "function", "python_function"})
    public void testPythonWordCountFunction() throws Exception {
        String inputTopicName = "test-wordcount-py-input-" + PulsarStateTest.randomName(8);
        String outputTopicName = "test-wordcount-py-output-" + PulsarStateTest.randomName(8);
        String functionName = "test-wordcount-py-fn-" + PulsarStateTest.randomName(8);
        int numMessages = 10;
        this.submitExclamationFunction(CommandGenerator.Runtime.PYTHON, inputTopicName, outputTopicName, functionName);
        this.getFunctionInfoSuccess(functionName);
        this.publishAndConsumeMessages(inputTopicName, outputTopicName, 10);
        this.getFunctionStatus(functionName, 10);
        this.queryState(functionName, "hello", 10);
        this.queryState(functionName, "test", 10);
        for (int i = 0; i < 10; ++i) {
            this.queryState(functionName, "message-" + i, 1);
        }
        this.deleteFunction(functionName);
        this.getFunctionInfoNotFound(functionName);
    }

    @Test(groups={"java_state", "state", "function", "java_function"})
    public void testSourceState() throws Exception {
        String outputTopicName = "test-state-source-output-" + PulsarStateTest.randomName(8);
        String sourceName = "test-state-source-" + PulsarStateTest.randomName(8);
        this.submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSource", "/pulsar/examples/java-test-functions.jar");
        this.getSourceInfoSuccess(sourceName);
        this.getSourceStatus(sourceName);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.container.getHttpServiceUrl()).build();){
            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
                Assert.assertEquals((int)status.getInstances().size(), (int)1);
                Assert.assertTrue((((SourceStatus.SourceInstanceStatus)status.getInstances().get((int)0)).getStatus().numWritten > 0L ? 1 : 0) != 0);
            });
            FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "initial");
            Assert.assertEquals((String)functionState.getStringValue(), (String)"val1");
            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
                Assert.assertTrue((boolean)functionState.getStringValue().matches("val1-.*"));
            });
        }
        this.deleteSource(sourceName);
        this.getSourceInfoNotFound(sourceName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"java_state", "state", "function", "java_function"})
    public void testSinkState() throws Exception {
        String inputTopicName = "test-state-sink-input-" + PulsarStateTest.randomName(8);
        String sinkName = "test-state-sink-" + PulsarStateTest.randomName(8);
        int numMessages = 10;
        this.submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", "/pulsar/examples/java-test-functions.jar");
        this.getSinkInfoSuccess(sinkName);
        this.getSinkStatus(sinkName);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.container.getHttpServiceUrl()).build();){
            PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
            try {
                Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
                try {
                    FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "initial");
                    Assert.assertEquals((String)functionState.getStringValue(), (String)"val1");
                    for (int i = 0; i < numMessages; ++i) {
                        producer.send((Object)"foo");
                    }
                    Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
                        Assert.assertEquals((int)status.getInstances().size(), (int)1);
                        Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numWrittenToSink > 0L ? 1 : 0) != 0);
                    });
                    Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                        FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "now");
                        Assert.assertEquals((String)functionState.getStringValue(), (String)String.format("val1-%d", numMessages - 1));
                    });
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client).get(0) != null) {
                    client.close();
                }
            }
        }
        this.deleteSink(sinkName);
        this.getSinkInfoNotFound(sinkName);
    }

    private void submitSourceConnector(String sourceName, String outputTopicName, String className, String archive) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sources", "create", "--name", sourceName, "--destinationTopicName", outputTopicName, "--archive", archive, "--classname", className};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Created successfully"), (String)result.getStdout());
    }

    private void submitSinkConnector(String sinkName, String inputTopicName, String className, String archive) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sinks", "create", "--name", sinkName, "--inputs", inputTopicName, "--archive", archive, "--classname", className};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Created successfully"), (String)result.getStdout());
    }

    private void submitExclamationFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName) throws Exception {
        this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, PulsarStateTest.getExclamationClass(runtime), Schema.BYTES);
    }

    protected static String getExclamationClass(CommandGenerator.Runtime runtime) {
        if (CommandGenerator.Runtime.PYTHON == runtime) {
            return WORDCOUNT_PYTHON_CLASS;
        }
        throw new IllegalArgumentException("Unsupported runtime : " + (Object)((Object)runtime));
    }

    private <T> void submitFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName, String functionClass, Schema<T> inputTopicSchema) throws Exception {
        String command;
        CommandGenerator generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
        generator.setSinkTopic(outputTopicName);
        generator.setFunctionName(functionName);
        if (CommandGenerator.Runtime.JAVA == runtime) {
            command = generator.generateCreateFunctionCommand();
        } else if (CommandGenerator.Runtime.PYTHON == runtime) {
            generator.setRuntime(runtime);
            command = generator.generateCreateFunctionCommand(WORDCOUNT_PYTHON_FILE);
        } else {
            throw new IllegalArgumentException("Unsupported runtime : " + (Object)((Object)runtime));
        }
        String[] commands = new String[]{"sh", "-c", command};
        ContainerExecResult result = this.container.execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Created successfully"));
        this.ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
    }

    private <T> void ensureSubscriptionCreated(String inputTopicName, String subscriptionName, Schema<T> inputTopicSchema) throws Exception {
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();){
            Consumer ignored = client.newConsumer(inputTopicSchema).topic(new String[]{inputTopicName}).subscriptionType(SubscriptionType.Shared).subscriptionName(subscriptionName).subscribe();
            if (ignored != null) {
                ignored.close();
            }
        }
    }

    private void getSinkInfoSuccess(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + sinkName + "\""));
    }

    private void getSourceInfoSuccess(String sourceName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + sourceName + "\""));
    }

    private void getFunctionInfoSuccess(String functionName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "functions", "get", "--tenant", "public", "--namespace", "default", "--name", functionName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + functionName + "\""));
    }

    private void getFunctionInfoNotFound(String functionName) throws Exception {
        try {
            this.container.execCmd("/pulsar/bin/pulsar-admin", "functions", "get", "--tenant", "public", "--namespace", "default", "--name", functionName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Function " + functionName + " doesn't exist"));
        }
    }

    private void getSinkStatus(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "status", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    private void getSourceStatus(String sourceName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "status", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    private void getFunctionStatus(String functionName, int numMessages) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "functions", "getstatus", "--tenant", "public", "--namespace", "default", "--name", functionName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
        Assert.assertTrue((boolean)result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages));
    }

    private void queryState(String functionName, String key, int amount) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "functions", "querystate", "--tenant", "public", "--namespace", "default", "--name", functionName, "--key", key);
        Assert.assertTrue((boolean)result.getStdout().contains("\"numberValue\": " + amount));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishAndConsumeMessages(String inputTopic, String outputTopic, int numMessages) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.BYTES).topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
            try {
                Producer producer = client.newProducer(Schema.BYTES).topic(inputTopic).create();
                try {
                    int i;
                    for (i = 0; i < numMessages; ++i) {
                        producer.send((Object)("hello test message-" + i).getBytes(StandardCharsets.UTF_8));
                    }
                    for (i = 0; i < numMessages; ++i) {
                        Message msg = consumer.receive();
                        Assert.assertEquals((String)("hello test message-" + i + "!"), (String)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private void deleteFunction(String functionName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "functions", "delete", "--tenant", "public", "--namespace", "default", "--name", functionName);
        Assert.assertTrue((boolean)result.getStdout().contains("Deleted successfully"));
        result.assertNoStderr();
    }

    private void deleteSource(String sourceName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "delete", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("Delete source successfully"));
        result.assertNoStderr();
    }

    private void deleteSink(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "delete", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("Deleted successfully"));
        result.assertNoStderr();
    }

    private void getSourceInfoNotFound(String sourceName) throws Exception {
        try {
            this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
        }
    }

    private void getSinkInfoNotFound(String sinkName) throws Exception {
        try {
            this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
        }
    }
}

