/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.examples.pojo.Users;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testng.Assert;
import org.testng.annotations.Test;

public class SinkWithTransformFunctionTest
extends PulsarStandaloneTestSuite {
    private static final Logger log = LoggerFactory.getLogger(SinkWithTransformFunctionTest.class);

    @Override
    public void setUpCluster() throws Exception {
        this.incrementSetupNumber();
        this.network = Network.newNetwork();
        String clusterName = PulsarClusterTestBase.randomName(8);
        this.container = (StandaloneContainer)((StandaloneContainer)((StandaloneContainer)new StandaloneContainer(clusterName, PulsarContainer.DEFAULT_IMAGE_NAME).withNetwork(this.network)).withNetworkAliases(new String[]{"standalone-" + clusterName})).withEnv("PF_stateStorageServiceUrl", "bk://localhost:4181");
        this.container.start();
        log.info("Pulsar cluster {} is up running:", (Object)clusterName);
        log.info("\tBinary Service Url : {}", (Object)this.container.getPlainTextServiceUrl());
        log.info("\tHttp Service Url : {}", (Object)this.container.getHttpServiceUrl());
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "namespaces", "policies", "public/default");
        Assert.assertEquals((long)0L, (long)result.getExitCode());
        log.info("public/default namespace policies are {}", (Object)result.getStdout());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"sink"})
    public void testSinkWithTransformFunction() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            int numRecords = 10;
            String sinkName = "sink-with-function";
            String topicName = "sink-with-function";
            String logTopicName = "log-sink-with-function";
            String packageName = "function://public/default/sink-with-function-function@1.0";
            this.submitPackage(packageName, "package-function", "/pulsar/examples/java-test-functions.jar");
            this.submitSinkConnector(sinkName, topicName, "org.apache.pulsar.tests.integration.io.TestLoggingSink", "/pulsar/examples/java-test-functions.jar", "{\"log-topic\": \"" + logTopicName + "\"}", packageName, "org.apache.pulsar.functions.api.examples.RecordFunction");
            this.getSinkInfoSuccess(sinkName);
            this.getSinkStatus(sinkName);
            Producer producer = client.newProducer(Schema.STRING).topic(topicName).create();
            try {
                Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{logTopicName}).subscriptionName("sub").subscribe();
                try {
                    int i;
                    for (i = 0; i < 10; ++i) {
                        producer.send((Object)(i + "-test"));
                    }
                    try {
                        log.info("waiting for sink {}", (Object)sinkName);
                        for (i = 0; i < 10; ++i) {
                            Message receive = consumer.receive(5, TimeUnit.SECONDS);
                            Assert.assertNotNull((Object)receive);
                            Assert.assertEquals((String)((String)receive.getValue()), (String)("STRING - " + i + "-test!"));
                        }
                    }
                    finally {
                        this.dumpFunctionLogs(sinkName);
                    }
                    this.deleteSink(sinkName);
                    this.getSinkInfoNotFound(sinkName);
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"sink"})
    public void testGenericObjectSinkWithTransformFunction() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            int numRecords = 10;
            String sinkName = "sink-with-genericobject-function";
            String topicName = "sink-with-genericobject-function";
            String logTopicName = "log-sink-with-genericobject-function";
            String packageName = "function://public/default/sink-with-genericobject-function-function@1.0";
            this.submitPackage(packageName, "package-function", "/pulsar/examples/java-test-functions.jar");
            this.submitSinkConnector(sinkName, topicName, "org.apache.pulsar.tests.integration.io.TestLoggingSink", "/pulsar/examples/java-test-functions.jar", "{\"log-topic\": \"" + logTopicName + "\"}", packageName, "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction");
            this.getSinkInfoSuccess(sinkName);
            this.getSinkStatus(sinkName);
            try {
                Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{logTopicName}).subscriptionName("sub").subscribe();
                try {
                    Producer producer1 = client.newProducer(Schema.AVRO(Users.UserV1.class)).topic(topicName).create();
                    try {
                        int i;
                        for (i = 0; i < 10; ++i) {
                            producer1.send((Object)new Users.UserV1("foo" + i, Integer.valueOf(i)));
                        }
                        for (i = 0; i < 10; ++i) {
                            Message receive = consumer.receive(5, TimeUnit.SECONDS);
                            Assert.assertNotNull((Object)receive);
                            Assert.assertEquals((String)((String)receive.getValue()), (String)("AVRO - {\"name\": \"foo" + i + "\"}"));
                        }
                        Producer producer2 = client.newProducer(Schema.AVRO(Users.UserV2.class)).topic(topicName).create();
                        try {
                            Message receive;
                            int i2;
                            for (i2 = 0; i2 < 10; ++i2) {
                                producer2.send((Object)new Users.UserV2("foo" + i2, Integer.valueOf(i2), "bar" + i2));
                            }
                            for (i2 = 0; i2 < 10; ++i2) {
                                receive = consumer.receive(5, TimeUnit.SECONDS);
                                Assert.assertNotNull((Object)receive);
                                Assert.assertEquals((String)((String)receive.getValue()), (String)("AVRO - {\"name\": \"foo" + i2 + "\", \"phone\": \"bar" + i2 + "\"}"));
                            }
                            for (i2 = 0; i2 < 10; ++i2) {
                                producer1.send((Object)new Users.UserV1("foo" + i2, Integer.valueOf(i2)));
                            }
                            for (i2 = 0; i2 < 10; ++i2) {
                                receive = consumer.receive(5, TimeUnit.SECONDS);
                                Assert.assertNotNull((Object)receive);
                                Assert.assertEquals((String)((String)receive.getValue()), (String)("AVRO - {\"name\": \"foo" + i2 + "\"}"));
                            }
                        }
                        finally {
                            if (Collections.singletonList(producer2).get(0) != null) {
                                producer2.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer1).get(0) != null) {
                            producer1.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                this.dumpFunctionLogs(sinkName);
            }
            this.deleteSink(sinkName);
            this.getSinkInfoNotFound(sinkName);
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"sink"})
    public void testKeyValueSinkWithTransformFunction() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            int numRecords = 10;
            String sinkName = "sink-with-kv-function";
            String topicName = "sink-with-kv-function";
            String logTopicName = "log-sink-with-kv-function";
            String packageName = "function://public/default/sink-with-kv-function-function@1.0";
            this.submitPackage(packageName, "package-function", "/pulsar/examples/java-test-functions.jar");
            this.submitSinkConnector(sinkName, topicName, "org.apache.pulsar.tests.integration.io.TestLoggingSink", "/pulsar/examples/java-test-functions.jar", "{\"log-topic\": \"" + logTopicName + "\"}", packageName, "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction");
            this.getSinkInfoSuccess(sinkName);
            this.getSinkStatus(sinkName);
            try {
                Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{logTopicName}).subscriptionName("sub").subscribe();
                try {
                    Producer producer = client.newProducer(Schema.KeyValue((Schema)Schema.AVRO(Users.UserV1.class), (Schema)Schema.AVRO(Users.UserV1.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED)).topic(topicName).create();
                    try {
                        int i;
                        for (i = 0; i < 10; ++i) {
                            producer.send((Object)new KeyValue((Object)new Users.UserV1("foo" + i, Integer.valueOf(i)), (Object)new Users.UserV1("bar" + i, Integer.valueOf(i + 100))));
                        }
                        for (i = 0; i < 10; ++i) {
                            Message receive = consumer.receive(5, TimeUnit.SECONDS);
                            Assert.assertNotNull((Object)receive);
                            Assert.assertEquals((String)((String)receive.getValue()), (String)("KEY_VALUE - (key = {\"age\": " + i + ", \"name\": \"foo" + i + "\"}, value = {\"name\": \"bar" + i + "\"})"));
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                this.dumpFunctionLogs(sinkName);
            }
            this.deleteSink(sinkName);
            this.getSinkInfoNotFound(sinkName);
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private void submitPackage(String packageName, String description, String packagePath) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "packages", "upload", packageName, "--description", description, "--path", packagePath};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("successfully"), (String)result.getStdout());
    }

    private void submitSinkConnector(String sinkName, String inputTopicName, String className, String archive, String configs, String transformFunction, String transformFunctionClassName) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sinks", "create", "--name", sinkName, "-i", inputTopicName, "--archive", archive, "--classname", className, "--sink-config", configs, "--transform-function", transformFunction, "--transform-function-classname", transformFunctionClassName};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Created successfully"), (String)result.getStdout());
    }

    private void getSinkInfoSuccess(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + sinkName + "\""));
    }

    private void getSinkStatus(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "status", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        log.info(result.getStdout());
        log.info(result.getStderr());
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    private void deleteSink(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "delete", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("successfully"));
        result.assertNoStderr();
    }

    private void getSinkInfoNotFound(String sinkName) throws Exception {
        try {
            this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains(sinkName + " doesn't exist"));
        }
    }
}

