/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.functions;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
import org.apache.pulsar.functions.api.examples.pojo.Users;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public abstract class PulsarFunctionsTest
extends PulsarFunctionsTestBase {
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionsTest.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    public PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
        super(functionRuntimeType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<String, String> produceMessagesToInputTopic(String inputTopicName, int numMessages) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            LinkedHashMap<String, String> linkedHashMap;
            block9: {
                Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
                try {
                    LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
                    for (int i = 0; i < numMessages; ++i) {
                        String key = "key-" + i;
                        String value = "value-" + i;
                        kvs.put(key, value);
                        producer.newMessage().key(key).value((Object)value).send();
                    }
                    linkedHashMap = kvs;
                    if (Collections.singletonList(producer).get(0) == null) break block9;
                }
                catch (Throwable throwable) {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                    throw throwable;
                }
                producer.close();
            }
            return linkedHashMap;
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testFunctionLocalRun(CommandGenerator.Runtime runtime) throws Exception {
        block43: {
            if (this.functionRuntimeType == FunctionRuntimeType.THREAD) {
                return;
            }
            String inputTopicName = "persistent://public/default/test-function-local-run-" + (Object)((Object)runtime) + "-input-" + PulsarFunctionsTest.randomName(8);
            String outputTopicName = "test-function-local-run-" + (Object)((Object)runtime) + "-output-" + PulsarFunctionsTest.randomName(8);
            int numMessages = 10;
            String cmd = "";
            CommandGenerator commandGenerator = new CommandGenerator();
            commandGenerator.setAdminUrl("pulsar://pulsar-broker-0:6650");
            commandGenerator.setSourceTopic(inputTopicName);
            commandGenerator.setSinkTopic(outputTopicName);
            commandGenerator.setFunctionName("localRunTest-" + PulsarFunctionsTest.randomName(8));
            commandGenerator.setRuntime(runtime);
            switch (runtime) {
                case JAVA: {
                    commandGenerator.setFunctionClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
                    cmd = commandGenerator.generateLocalRunCommand(null);
                    break;
                }
                case PYTHON: {
                    commandGenerator.setFunctionClassName("exclamation_function.ExclamationFunction");
                    cmd = commandGenerator.generateLocalRunCommand("exclamation_function.py");
                    break;
                }
                case GO: {
                    commandGenerator.setFunctionClassName(null);
                    cmd = commandGenerator.generateLocalRunCommand("exclamationFunc");
                }
            }
            log.info("cmd: {}", (Object)cmd);
            this.pulsarCluster.getAnyWorker().execCmdAsync(cmd.split(" "));
            try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
                admin.topics().createNonPartitionedTopic(inputTopicName);
                admin.topics().createNonPartitionedTopic(outputTopicName);
                PulsarFunctionsTest.retryStrategically(test -> {
                    try {
                        return admin.topics().getStats(inputTopicName).getSubscriptions().size() == 1;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 30, 200L);
                Assert.assertEquals((int)admin.topics().getStats(inputTopicName).getSubscriptions().size(), (int)1);
                if (CommandGenerator.Runtime.JAVA == runtime) {
                    PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
                    try {
                        Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{outputTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                        try {
                            Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
                            try {
                                int i;
                                for (int i2 = 0; i2 < 10; ++i2) {
                                    producer.send((Object)("message-" + i2));
                                }
                                HashSet<String> expectedMessages = new HashSet<String>();
                                for (i = 0; i < 10; ++i) {
                                    expectedMessages.add("message-" + i + "!");
                                }
                                for (i = 0; i < 10; ++i) {
                                    Message msg = consumer.receive(120, TimeUnit.SECONDS);
                                    log.info("Received: {}", msg.getValue());
                                    Assert.assertTrue((boolean)expectedMessages.contains(msg.getValue()));
                                    expectedMessages.remove(msg.getValue());
                                }
                                Assert.assertEquals((int)expectedMessages.size(), (int)0);
                                break block43;
                            }
                            finally {
                                if (Collections.singletonList(producer).get(0) != null) {
                                    producer.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer).get(0) != null) {
                                consumer.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(client).get(0) != null) {
                            client.close();
                        }
                    }
                }
                PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
                try {
                    Consumer consumer = client.newConsumer(Schema.BYTES).topic(new String[]{outputTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                    try {
                        Producer producer = client.newProducer(Schema.BYTES).topic(inputTopicName).enableBatching(true).batcherBuilder(BatcherBuilder.DEFAULT).create();
                        try {
                            int i;
                            for (int i3 = 0; i3 < 10; ++i3) {
                                producer.newMessage().value((Object)("message-" + i3).getBytes(StandardCharsets.UTF_8)).send();
                            }
                            HashSet<String> expectedMessages = new HashSet<String>();
                            for (i = 0; i < 10; ++i) {
                                expectedMessages.add("message-" + i + "!");
                            }
                            for (i = 0; i < 10; ++i) {
                                Message msg = consumer.receive(120, TimeUnit.SECONDS);
                                String msgValue = new String((byte[])msg.getValue(), StandardCharsets.UTF_8);
                                log.info("Received: {}", (Object)msgValue);
                                Assert.assertTrue((boolean)expectedMessages.contains(msgValue));
                                expectedMessages.remove(msgValue);
                            }
                            Assert.assertEquals((int)expectedMessages.size(), (int)0);
                        }
                        finally {
                            if (Collections.singletonList(producer).get(0) != null) {
                                producer.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer).get(0) != null) {
                            consumer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(client).get(0) != null) {
                        client.close();
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWindowFunction(String type, String[] expectedResults) throws Exception {
        int NUM_OF_MESSAGES = 100;
        int windowLengthCount = 10;
        int slidingIntervalCount = 5;
        String functionName = "test-" + type + "-window-fn-" + PulsarFunctionsTest.randomName(8);
        String inputTopicName = "test-" + type + "-count-window-" + (Object)((Object)this.functionRuntimeType) + "-input-" + PulsarFunctionsTest.randomName(8);
        String outputTopicName = "test-" + type + "-count-window-" + (Object)((Object)this.functionRuntimeType) + "-output-" + PulsarFunctionsTest.randomName(8);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
            admin.topics().createNonPartitionedTopic(inputTopicName);
            admin.topics().createNonPartitionedTopic(outputTopicName);
        }
        CommandGenerator generator = CommandGenerator.createDefaultGenerator(inputTopicName, "org.apache.pulsar.functions.api.examples.WindowDurationFunction");
        generator.setFunctionName(functionName);
        generator.setSinkTopic(outputTopicName);
        generator.setWindowLengthCount(windowLengthCount);
        if (type.equals("sliding")) {
            generator.setSlidingIntervalCount(slidingIntervalCount);
        }
        String[] commands = new String[]{"sh", "-c", generator.generateCreateFunctionCommand()};
        ContainerExecResult containerExecResult = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)containerExecResult.getStdout().contains("\"Created successfully\""));
        this.getFunctionInfoSuccess(functionName);
        containerExecResult = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "status", "--tenant", "public", "--namespace", "default", "--name", functionName);
        FunctionStatus functionStatus = FunctionStatusUtil.decode((String)containerExecResult.getStdout());
        Assert.assertEquals((int)functionStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)functionStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)functionStatus.getInstances().size(), (int)1);
        Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((boolean)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertEquals((long)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getNumReceived(), (long)0L);
        Assert.assertEquals((long)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getNumSuccessfullyProcessed(), (long)0L);
        Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getLatestUserExceptions().size(), (int)0);
        Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            Reader reader = client.newReader().startMessageId(MessageId.earliest).topic(outputTopicName).create();
            try {
                Producer producer = client.newProducer(Schema.BYTES).topic(inputTopicName).enableBatching(false).create();
                try {
                    int i;
                    for (i = 0; i < NUM_OF_MESSAGES; ++i) {
                        producer.send((Object)String.format("%d", i).getBytes());
                    }
                    i = 0;
                    while (true) {
                        Message msg;
                        if (i > expectedResults.length) {
                            Assertions.fail((String)"More results than expected");
                        }
                        if ((msg = reader.readNext(30, TimeUnit.SECONDS)) == null) break;
                        String msgStr = new String(msg.getData());
                        log.info("[testWindowFunction] i: {} RECV: {}", (Object)i, (Object)msgStr);
                        String result = msgStr.split(":")[0];
                        Assertions.assertThat((String)result).contains(new CharSequence[]{expectedResults[i]});
                        ++i;
                    }
                    this.getFunctionStatus(functionName, NUM_OF_MESSAGES, true);
                    Assertions.assertThat((int)i).isGreaterThanOrEqualTo(expectedResults.length - 1);
                    this.deleteFunction(functionName);
                    this.getFunctionInfoNotFound(functionName);
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(reader).get(0) != null) {
                    reader.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testFunctionNegAck(CommandGenerator.Runtime runtime) throws Exception {
        String functionName;
        String inputTopicName;
        block41: {
            Producer producer;
            Consumer consumer;
            PulsarClient client;
            if (this.functionRuntimeType == FunctionRuntimeType.THREAD) {
                return;
            }
            Schema schema = CommandGenerator.Runtime.JAVA == runtime ? Schema.STRING : Schema.BYTES;
            inputTopicName = "persistent://public/default/test-neg-ack-" + (Object)((Object)runtime) + "-input-" + PulsarFunctionsTest.randomName(8);
            String outputTopicName = "test-neg-ack-" + (Object)((Object)runtime) + "-output-" + PulsarFunctionsTest.randomName(8);
            try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
                admin.topics().createNonPartitionedTopic(inputTopicName);
                admin.topics().createNonPartitionedTopic(outputTopicName);
            }
            functionName = "test-neg-ack-fn-" + PulsarFunctionsTest.randomName(8);
            int numMessages = 20;
            if (runtime == CommandGenerator.Runtime.PYTHON) {
                this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, "exception_function.py", "exception_function", schema);
            } else {
                this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, null, "org.apache.pulsar.tests.integration.functions.ExceptionFunction", schema);
            }
            this.getFunctionInfoSuccess(functionName);
            this.getFunctionStatsEmpty(functionName);
            if (CommandGenerator.Runtime.JAVA == runtime) {
                client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
                try {
                    consumer = client.newConsumer(Schema.STRING).topic(new String[]{outputTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                    try {
                        producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
                        try {
                            int i;
                            for (int i2 = 0; i2 < 20; ++i2) {
                                producer.send((Object)("message-" + i2));
                            }
                            HashSet<String> expectedMessages = new HashSet<String>();
                            for (i = 0; i < 20; ++i) {
                                expectedMessages.add("message-" + i + "!");
                            }
                            for (i = 0; i < 20; ++i) {
                                Message msg = consumer.receive(120, TimeUnit.SECONDS);
                                log.info("Received: {}", msg.getValue());
                                Assert.assertTrue((boolean)expectedMessages.contains(msg.getValue()));
                                expectedMessages.remove(msg.getValue());
                            }
                            Assert.assertEquals((int)expectedMessages.size(), (int)0);
                            break block41;
                        }
                        finally {
                            if (Collections.singletonList(producer).get(0) != null) {
                                producer.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer).get(0) != null) {
                            consumer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(client).get(0) != null) {
                        client.close();
                    }
                }
            }
            client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
            try {
                consumer = client.newConsumer(Schema.BYTES).topic(new String[]{outputTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                try {
                    producer = client.newProducer(Schema.BYTES).topic(inputTopicName).create();
                    try {
                        int i;
                        for (int i3 = 0; i3 < 20; ++i3) {
                            producer.newMessage().value((Object)("message-" + i3).getBytes(StandardCharsets.UTF_8)).send();
                        }
                        HashSet<String> expectedMessages = new HashSet<String>();
                        for (i = 0; i < 20; ++i) {
                            expectedMessages.add("message-" + i + "!");
                        }
                        for (i = 0; i < 20; ++i) {
                            Message msg = consumer.receive(120, TimeUnit.SECONDS);
                            String msgValue = new String((byte[])msg.getValue(), StandardCharsets.UTF_8);
                            log.info("Received: {}", (Object)msgValue);
                            Assert.assertTrue((boolean)expectedMessages.contains(msgValue));
                            expectedMessages.remove(msgValue);
                        }
                        Assert.assertEquals((int)expectedMessages.size(), (int)0);
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client).get(0) != null) {
                    client.close();
                }
            }
        }
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "status", "--tenant", "public", "--namespace", "default", "--name", functionName);
        FunctionStatus functionStatus = FunctionStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)functionStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)functionStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)functionStatus.getInstances().size(), (int)1);
        Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getInstanceId(), (int)0);
        Assert.assertTrue((((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getAverageLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((boolean)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertTrue((((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getLastInvocationTime() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getNumReceived(), (long)22L);
        Assert.assertEquals((long)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getNumSuccessfullyProcessed(), (long)20L);
        Assert.assertEquals((long)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getLatestUserExceptions().size(), (int)2);
        Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
        result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "stats", "--tenant", "public", "--namespace", "default", "--name", functionName);
        log.info("FUNCTION STATS: {}", (Object)result.getStdout());
        FunctionStatsImpl functionStats = FunctionStatsImpl.decode((String)result.getStdout());
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)22L);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)20L);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)2L);
        Assert.assertTrue((functionStats.avgProcessLatency > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((functionStats.getLastInvocation() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)22L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)20L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)2L);
        Assert.assertTrue((((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        this.deleteFunction(functionName);
        this.getFunctionInfoNotFound(functionName);
        this.checkSubscriptionsCleanup(inputTopicName);
    }

    public void testGoPublishFunction() throws Exception {
        this.testPublishFunction(CommandGenerator.Runtime.GO);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testPublishFunction(CommandGenerator.Runtime runtime) throws Exception {
        if (this.functionRuntimeType == FunctionRuntimeType.THREAD) {
            return;
        }
        Schema schema = CommandGenerator.Runtime.JAVA == runtime ? Schema.STRING : Schema.BYTES;
        String inputTopicName = "persistent://public/default/test-publish-" + (Object)((Object)runtime) + "-input-" + PulsarFunctionsTest.randomName(8);
        String outputTopicName = "test-publish-" + (Object)((Object)runtime) + "-output-" + PulsarFunctionsTest.randomName(8);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
            admin.topics().createNonPartitionedTopic(inputTopicName);
            admin.topics().createNonPartitionedTopic(outputTopicName);
        }
        String functionName = "test-publish-fn-" + PulsarFunctionsTest.randomName(8);
        int numMessages = 10;
        switch (runtime) {
            case JAVA: {
                this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, null, "org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish", schema, Collections.singletonMap("publish-topic", outputTopicName), null, null, null);
                break;
            }
            case PYTHON: {
                this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, "typed_message_builder_publish.py", "typed_message_builder_publish.TypedMessageBuilderPublish", schema, Collections.singletonMap("publish-topic", outputTopicName), null, null, null);
                break;
            }
            case GO: {
                this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, "exclamationFunc", null, schema, Collections.singletonMap("publish-topic", outputTopicName), null, null, null);
            }
        }
        this.getFunctionInfoSuccess(functionName);
        this.getFunctionStatsEmpty(functionName);
        if (CommandGenerator.Runtime.JAVA == runtime) {
            this.publishAndConsumeMessages(inputTopicName, outputTopicName, 10);
        } else {
            PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
            try {
                Consumer consumer = client.newConsumer(Schema.BYTES).topic(new String[]{outputTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                try {
                    Producer producer = client.newProducer(Schema.BYTES).topic(inputTopicName).create();
                    try {
                        int i;
                        for (int i2 = 0; i2 < 10; ++i2) {
                            producer.newMessage().key(String.valueOf(i2)).property("count", String.valueOf(i2)).value((Object)("message-" + i2).getBytes(StandardCharsets.UTF_8)).send();
                        }
                        HashSet<String> expectedMessages = new HashSet<String>();
                        for (i = 0; i < 10; ++i) {
                            expectedMessages.add("message-" + i + "!");
                        }
                        for (i = 0; i < 10; ++i) {
                            Message msg = consumer.receive(30, TimeUnit.SECONDS);
                            String msgValue = new String((byte[])msg.getValue(), StandardCharsets.UTF_8);
                            log.info("Received: {}", (Object)msgValue);
                            Assert.assertEquals((String)msg.getKey(), (String)String.valueOf(i));
                            Assert.assertEquals((String)((String)msg.getProperties().get("count")), (String)String.valueOf(i));
                            Assert.assertEquals((String)((String)msg.getProperties().get("input_topic")), (String)inputTopicName);
                            Assert.assertTrue((msg.getEventTime() > 0L ? 1 : 0) != 0);
                            Assert.assertTrue((boolean)expectedMessages.contains(msgValue));
                            expectedMessages.remove(msgValue);
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client).get(0) != null) {
                    client.close();
                }
            }
        }
        this.getFunctionStatus(functionName, 10, true);
        this.getFunctionStats(functionName, 10);
        this.deleteFunction(functionName);
        this.getFunctionInfoNotFound(functionName);
        this.checkSubscriptionsCleanup(inputTopicName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testExclamationFunction(CommandGenerator.Runtime runtime, boolean isTopicPattern, boolean pyZip, boolean withExtraDeps) throws Exception {
        if (this.functionRuntimeType == FunctionRuntimeType.THREAD && runtime == CommandGenerator.Runtime.PYTHON) {
            return;
        }
        Schema schema = CommandGenerator.Runtime.JAVA == runtime ? Schema.STRING : Schema.BYTES;
        String inputTopicName = "persistent://public/default/test-exclamation-" + (Object)((Object)runtime) + "-input-" + PulsarFunctionsTest.randomName(8);
        String outputTopicName = "test-exclamation-" + (Object)((Object)runtime) + "-output-" + PulsarFunctionsTest.randomName(8);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
            admin.topics().createNonPartitionedTopic(inputTopicName);
            admin.topics().createNonPartitionedTopic(outputTopicName);
        }
        if (isTopicPattern) {
            PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
            try {
                Consumer consumer1 = client.newConsumer(schema).topic(new String[]{inputTopicName + "1"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                try {
                    Consumer consumer2 = client.newConsumer(schema).topic(new String[]{inputTopicName + "2"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
                    try {
                        inputTopicName = inputTopicName + ".*";
                    }
                    finally {
                        if (Collections.singletonList(consumer2).get(0) != null) {
                            consumer2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer1).get(0) != null) {
                        consumer1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client).get(0) != null) {
                    client.close();
                }
            }
        }
        String functionName = "test-exclamation-fn-" + PulsarFunctionsTest.randomName(8);
        int numMessages = 10;
        this.submitExclamationFunction(runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps, schema);
        this.getFunctionInfoSuccess(functionName);
        this.getFunctionStatsEmpty(functionName);
        if (CommandGenerator.Runtime.JAVA == runtime) {
            this.publishAndConsumeMessages(inputTopicName, outputTopicName, 10);
        } else {
            this.publishAndConsumeMessagesBytes(inputTopicName, outputTopicName, 10);
        }
        this.getFunctionStatus(functionName, 10, true);
        this.getFunctionStats(functionName, 10);
        this.updateFunctionParallelism(functionName, 2);
        this.getFunctionStatus(functionName, 0, true, 2);
        this.deleteFunction(functionName);
        this.getFunctionInfoNotFound(functionName);
        this.checkSubscriptionsCleanup(inputTopicName);
    }

    private void submitExclamationFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName, boolean pyZip, boolean withExtraDeps, Schema<?> schema) throws Exception {
        this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps, false, PulsarFunctionsTest.getExclamationClass(runtime, pyZip, withExtraDeps), schema);
    }

    private <T> void submitFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName, boolean pyZip, boolean withExtraDeps, boolean isPublishFunction, String functionClass, Schema<T> inputTopicSchema) throws Exception {
        String file = null;
        if (CommandGenerator.Runtime.JAVA == runtime) {
            file = null;
        } else if (CommandGenerator.Runtime.PYTHON == runtime) {
            file = isPublishFunction ? "typed_message_builder_publish.py" : (pyZip ? "exclamation.zip" : (withExtraDeps ? "exclamation_with_extra_deps.py" : "exclamation_function.py"));
        }
        this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, file, functionClass, inputTopicSchema);
    }

    private <T> void submitFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName, String functionFile, String functionClass, Schema<T> inputTopicSchema) throws Exception {
        this.submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass, inputTopicSchema, null, null, null, null);
    }

    private <T> void submitFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName, String functionFile, String functionClass, Schema<T> inputTopicSchema, Map<String, String> userConfigs, String customSchemaInputs, String outputSchemaType, SubscriptionInitialPosition subscriptionInitialPosition) throws Exception {
        CommandGenerator generator;
        log.info("------- INPUT TOPIC: '{}', customSchemaInputs: {}", (Object)inputTopicName, (Object)customSchemaInputs);
        if (inputTopicName.endsWith(".*")) {
            log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
        } else {
            log.info("----- CREATING REGULAR FUNCTION --- ");
            generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
        }
        generator.setSinkTopic(outputTopicName);
        generator.setFunctionName(functionName);
        if (userConfigs != null) {
            generator.setUserConfig(userConfigs);
        }
        if (customSchemaInputs != null) {
            generator.setCustomSchemaInputs(customSchemaInputs);
        }
        if (outputSchemaType != null) {
            generator.setSchemaType(outputSchemaType);
        }
        if (subscriptionInitialPosition != null) {
            generator.setSubscriptionInitialPosition(subscriptionInitialPosition);
        }
        String command = "";
        switch (runtime) {
            case JAVA: {
                command = generator.generateCreateFunctionCommand();
                break;
            }
            case PYTHON: 
            case GO: {
                generator.setRuntime(runtime);
                command = generator.generateCreateFunctionCommand(functionFile);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported runtime : " + (Object)((Object)runtime));
            }
        }
        log.info("---------- Function command: {}", (Object)command);
        String[] commands = new String[]{"sh", "-c", command};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""));
        if (StringUtils.isNotEmpty((CharSequence)inputTopicName)) {
            this.ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
        }
    }

    private void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
        CommandGenerator generator = new CommandGenerator();
        generator.setFunctionName(functionName);
        generator.setParallelism(parallelism);
        String command = generator.generateUpdateFunctionCommand();
        log.info("---------- Function command: {}", (Object)command);
        String[] commands = new String[]{"sh", "-c", command};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Updated successfully\""));
    }

    protected <T> void submitFunction(CommandGenerator.Runtime runtime, String inputTopicName, String outputTopicName, String functionName, String functionFile, String functionClass, String outputSerdeClassName, Map<String, String> userConfigs) throws Exception {
        String command;
        CommandGenerator generator;
        log.info("------- INPUT TOPIC: '{}'", (Object)inputTopicName);
        if (inputTopicName.endsWith(".*")) {
            log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
        } else {
            log.info("----- CREATING REGULAR FUNCTION --- ");
            generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
        }
        generator.setSinkTopic(outputTopicName);
        generator.setFunctionName(functionName);
        generator.setOutputSerDe(outputSerdeClassName);
        if (userConfigs != null) {
            generator.setUserConfig(userConfigs);
        }
        if (CommandGenerator.Runtime.JAVA == runtime) {
            command = generator.generateCreateFunctionCommand();
        } else if (CommandGenerator.Runtime.PYTHON == runtime) {
            generator.setRuntime(runtime);
            command = generator.generateCreateFunctionCommand(functionFile);
        } else {
            throw new IllegalArgumentException("Unsupported runtime : " + (Object)((Object)runtime));
        }
        log.info("---------- Function command: {}", (Object)command);
        String[] commands = new String[]{"sh", "-c", command};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""));
    }

    private <T> void ensureSubscriptionCreated(String inputTopicName, String subscriptionName, Schema<T> inputTopicSchema) throws Exception {
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            Consumer ignored = client.newConsumer(inputTopicSchema).topic(new String[]{inputTopicName}).subscriptionType(SubscriptionType.Shared).subscriptionName(subscriptionName).subscribe();
            if (ignored != null) {
                ignored.close();
            }
        }
    }

    protected void getFunctionInfoSuccess(String functionName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "get", "--tenant", "public", "--namespace", "default", "--name", functionName);
        log.info("FUNCTION STATE: {}", (Object)result.getStdout());
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + functionName + "\""));
    }

    protected void getFunctionStatsEmpty(String functionName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "stats", "--tenant", "public", "--namespace", "default", "--name", functionName);
        log.info("FUNCTION STATS: {}", (Object)result.getStdout());
        FunctionStatsImpl functionStats = FunctionStatsImpl.decode((String)result.getStdout());
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)functionStats.avgProcessLatency, null);
        Assert.assertEquals((long)functionStats.oneMin.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)functionStats.oneMin.getAvgProcessLatency(), null);
        Assert.assertEquals((Object)functionStats.getAvgProcessLatency(), (Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertEquals((Object)functionStats.getLastInvocation(), null);
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), null);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getUserExceptionsTotal(), (long)0L);
        Assert.assertEquals((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency(), null);
    }

    private void getFunctionStats(String functionName, int numMessages) throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "stats", "--tenant", "public", "--namespace", "default", "--name", functionName);
        log.info("FUNCTION STATS: {}", (Object)result.getStdout());
        FunctionStatsImpl functionStats = FunctionStatsImpl.decode((String)result.getStdout());
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)numMessages);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)numMessages);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((functionStats.avgProcessLatency > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)functionStats.oneMin.getReceivedTotal(), (long)numMessages);
        Assert.assertEquals((long)functionStats.oneMin.getProcessedSuccessfullyTotal(), (long)numMessages);
        Assert.assertEquals((long)functionStats.oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((functionStats.oneMin.getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)functionStats.getAvgProcessLatency(), (Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertTrue((functionStats.getLastInvocation() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)numMessages);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)numMessages);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getReceivedTotal(), (long)numMessages);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), (long)numMessages);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
    }

    private void getFunctionInfoNotFound(String functionName) throws Exception {
        PulsarFunctionsTest.retryStrategically(aVoid -> {
            try {
                this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "get", "--tenant", "public", "--namespace", "default", "--name", functionName);
            }
            catch (ContainerExecException e) {
                if (e.getResult().getStderr().contains("Reason: Function " + functionName + " doesn't exist")) {
                    return true;
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            return false;
        }, 5, 100L, true);
    }

    private void checkSubscriptionsCleanup(String topic) throws Exception {
        try {
            ContainerExecResult result = this.pulsarCluster.getAnyBroker().execCmd("/pulsar/bin/pulsar-admin", "topics", "stats", topic);
            TopicStats topicStats = (TopicStats)ObjectMapperFactory.getThreadLocal().readValue(result.getStdout(), TopicStats.class);
            Assert.assertEquals((int)topicStats.getSubscriptions().size(), (int)0);
        }
        catch (ContainerExecException e) {
            Assert.fail((String)"Command should have exited with non-zero");
        }
    }

    private void checkPublisherCleanup(String topic) throws Exception {
        try {
            ContainerExecResult result = this.pulsarCluster.getAnyBroker().execCmd("/pulsar/bin/pulsar-admin", "topics", "stats", topic);
            TopicStats topicStats = (TopicStats)ObjectMapperFactory.getThreadLocal().readValue(result.getStdout(), TopicStats.class);
            Assert.assertEquals((int)topicStats.getPublishers().size(), (int)0);
        }
        catch (ContainerExecException e) {
            Assert.fail((String)"Command should have exited with non-zero");
        }
    }

    private void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
        this.getFunctionStatus(functionName, numMessages, checkRestarts, 1);
    }

    private void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism) throws Exception {
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(15L)).ignoreExceptions().untilAsserted(() -> this.doGetFunctionStatus(functionName, numMessages, checkRestarts, parallelism));
    }

    private void doGetFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism) throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "status", "--tenant", "public", "--namespace", "default", "--name", functionName);
        FunctionStatus functionStatus = FunctionStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)functionStatus.getNumInstances(), (int)parallelism);
        Assert.assertEquals((int)functionStatus.getNumRunning(), (int)parallelism);
        Assert.assertEquals((int)functionStatus.getInstances().size(), (int)parallelism);
        boolean avgLatencyGreaterThanZero = false;
        int totalMessagesProcessed = 0;
        int totalMessagesSuccessfullyProcessed = 0;
        boolean lastInvocationTimeGreaterThanZero = false;
        for (int i = 0; i < parallelism; ++i) {
            Assert.assertEquals((boolean)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().isRunning(), (boolean)true);
            Assert.assertTrue((((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getInstanceId() >= 0 ? 1 : 0) != 0);
            Assert.assertTrue((((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getInstanceId() < parallelism ? 1 : 0) != 0);
            avgLatencyGreaterThanZero = avgLatencyGreaterThanZero || ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getAverageLatency() > 0.0;
            lastInvocationTimeGreaterThanZero = lastInvocationTimeGreaterThanZero || ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getLastInvocationTime() > 0L;
            totalMessagesProcessed = (int)((long)totalMessagesProcessed + ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getNumReceived());
            totalMessagesSuccessfullyProcessed = (int)((long)totalMessagesSuccessfullyProcessed + ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getNumSuccessfullyProcessed());
            if (checkRestarts) {
                Assert.assertEquals((long)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getNumRestarts(), (long)0L);
            }
            Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getLatestUserExceptions().size(), (int)0);
            Assert.assertEquals((int)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(i)).getStatus().getLatestSystemExceptions().size(), (int)0);
        }
        if (numMessages > 0) {
            Assert.assertTrue((boolean)avgLatencyGreaterThanZero);
            Assert.assertTrue((boolean)lastInvocationTimeGreaterThanZero);
        }
        Assert.assertEquals((int)totalMessagesProcessed, (int)numMessages);
        Assert.assertEquals((int)totalMessagesSuccessfullyProcessed, (int)numMessages);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishAndConsumeMessages(String inputTopic, String outputTopic, int numMessages) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
            try {
                int i;
                block26: {
                    if (inputTopic.endsWith(".*")) {
                        Producer producer1 = client.newProducer(Schema.STRING).topic(inputTopic.substring(0, inputTopic.length() - 2) + "1").create();
                        try {
                            Producer producer2 = client.newProducer(Schema.STRING).topic(inputTopic.substring(0, inputTopic.length() - 2) + "2").create();
                            try {
                                int i2;
                                for (i2 = 0; i2 < numMessages / 2; ++i2) {
                                    producer1.send((Object)("message-" + i2));
                                }
                                for (i2 = numMessages / 2; i2 < numMessages; ++i2) {
                                    producer2.send((Object)("message-" + i2));
                                }
                                break block26;
                            }
                            finally {
                                if (Collections.singletonList(producer2).get(0) != null) {
                                    producer2.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(producer1).get(0) != null) {
                                producer1.close();
                            }
                        }
                    }
                    Producer producer = client.newProducer(Schema.STRING).topic(inputTopic).create();
                    try {
                        for (int i3 = 0; i3 < numMessages; ++i3) {
                            producer.send((Object)("message-" + i3));
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
                HashSet<String> expectedMessages = new HashSet<String>();
                for (i = 0; i < numMessages; ++i) {
                    expectedMessages.add("message-" + i + "!");
                }
                for (i = 0; i < numMessages; ++i) {
                    Message msg = consumer.receive(30, TimeUnit.SECONDS);
                    log.info("Received: {}", msg.getValue());
                    Assert.assertTrue((boolean)expectedMessages.contains(msg.getValue()));
                    expectedMessages.remove(msg.getValue());
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishAndConsumeMessagesBytes(String inputTopic, String outputTopic, int numMessages) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.BYTES).topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
            try {
                int i;
                block26: {
                    if (inputTopic.endsWith(".*")) {
                        Producer producer1 = client.newProducer(Schema.BYTES).topic(inputTopic.substring(0, inputTopic.length() - 2) + "1").create();
                        try {
                            Producer producer2 = client.newProducer(Schema.BYTES).topic(inputTopic.substring(0, inputTopic.length() - 2) + "2").create();
                            try {
                                int i2;
                                for (i2 = 0; i2 < numMessages / 2; ++i2) {
                                    producer1.send((Object)("message-" + i2).getBytes(StandardCharsets.UTF_8));
                                }
                                for (i2 = numMessages / 2; i2 < numMessages; ++i2) {
                                    producer2.send((Object)("message-" + i2).getBytes(StandardCharsets.UTF_8));
                                }
                                break block26;
                            }
                            finally {
                                if (Collections.singletonList(producer2).get(0) != null) {
                                    producer2.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(producer1).get(0) != null) {
                                producer1.close();
                            }
                        }
                    }
                    Producer producer = client.newProducer(Schema.BYTES).topic(inputTopic).create();
                    try {
                        for (int i3 = 0; i3 < numMessages; ++i3) {
                            producer.send((Object)("message-" + i3).getBytes(StandardCharsets.UTF_8));
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
                HashSet<String> expectedMessages = new HashSet<String>();
                for (i = 0; i < numMessages; ++i) {
                    expectedMessages.add("message-" + i + "!");
                }
                for (i = 0; i < numMessages; ++i) {
                    Message msg = consumer.receive(30, TimeUnit.SECONDS);
                    String msgValue = new String((byte[])msg.getValue(), StandardCharsets.UTF_8);
                    log.info("Received: {}", (Object)msgValue);
                    Assert.assertTrue((boolean)expectedMessages.contains(msgValue));
                    expectedMessages.remove(msgValue);
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private void deleteFunction(String functionName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "delete", "--tenant", "public", "--namespace", "default", "--name", functionName);
        Assert.assertTrue((boolean)result.getStdout().contains("Deleted successfully"));
        result.assertNoStderr();
    }

    @Test(groups={"function"})
    public void testAutoSchemaFunction() throws Exception {
        String inputTopicName = "test-autoschema-input-" + PulsarFunctionsTest.randomName(8);
        String outputTopicName = "test-autoshcema-output-" + PulsarFunctionsTest.randomName(8);
        String functionName = "test-autoschema-fn-" + PulsarFunctionsTest.randomName(8);
        int numMessages = 10;
        this.submitFunction(CommandGenerator.Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false, false, AutoSchemaFunction.class.getName(), Schema.AVRO(CustomObject.class));
        this.getFunctionInfoSuccess(functionName);
        this.publishAndConsumeAvroMessages(inputTopicName, outputTopicName, 10);
        this.getFunctionStatus(functionName, 10, false);
        this.deleteFunction(functionName);
        this.getFunctionInfoNotFound(functionName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishAndConsumeAvroMessages(String inputTopic, String outputTopic, int numMessages) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
            try {
                Producer producer = client.newProducer(Schema.AVRO(CustomObject.class)).topic(inputTopic).create();
                try {
                    int i;
                    for (i = 0; i < numMessages; ++i) {
                        CustomObject co = new CustomObject((long)i);
                        producer.send((Object)co);
                    }
                    for (i = 0; i < numMessages; ++i) {
                        Message msg = consumer.receive();
                        Assert.assertEquals((String)("value-" + i), (String)((String)msg.getValue()));
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"function"})
    public void testAvroSchemaFunction() throws Exception {
        log.info("testAvroSchemaFunction start ...");
        String inputTopic = "test-avroschema-input-" + PulsarFunctionsTest.randomName(8);
        String outputTopic = "test-avroschema-output-" + PulsarFunctionsTest.randomName(8);
        String functionName = "test-avroschema-fn-" + PulsarFunctionsTest.randomName(8);
        int numMessages = 10;
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            log.info("pulsar client init - input: {}, output: {}", (Object)inputTopic, (Object)outputTopic);
            Producer producer = pulsarClient.newProducer(Schema.AVRO(AvroTestObject.class)).topic(inputTopic).create();
            try {
                log.info("pulsar producer init - {}", (Object)inputTopic);
                Consumer consumer = pulsarClient.newConsumer(Schema.AVRO(AvroTestObject.class)).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-avro-schema").topic(new String[]{outputTopic}).subscribe();
                try {
                    log.info("pulsar consumer init - {}", (Object)outputTopic);
                    CompletableFuture inputSchemaFuture = ((PulsarClientImpl)pulsarClient).getSchema(inputTopic);
                    inputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
                        if (schemaInfo.isPresent()) {
                            log.info("inputSchemaInfo: {}", (Object)((SchemaInfo)schemaInfo.get()).toString());
                        } else {
                            log.error("input schema is not present!");
                        }
                    });
                    CompletableFuture outputSchemaFuture = ((PulsarClientImpl)pulsarClient).getSchema(outputTopic);
                    outputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
                        if (throwable != null) {
                            log.error("get output schemaInfo error", throwable);
                            throwable.printStackTrace();
                            return;
                        }
                        if (schemaInfo.isPresent()) {
                            log.info("outputSchemaInfo: {}", (Object)((SchemaInfo)schemaInfo.get()).toString());
                        } else {
                            log.error("output schema is not present!");
                        }
                    });
                    this.submitFunction(CommandGenerator.Runtime.JAVA, inputTopic, outputTopic, functionName, null, AvroSchemaTestFunction.class.getName(), Schema.AVRO(AvroTestObject.class));
                    log.info("pulsar submitFunction");
                    this.getFunctionInfoSuccess(functionName);
                    AvroSchemaTestFunction function = new AvroSchemaTestFunction();
                    HashSet<AvroTestObject> expectedSet = new HashSet<AvroTestObject>();
                    log.info("test-avro-schema producer connected: " + producer.isConnected());
                    for (int i = 0; i < 10; ++i) {
                        AvroTestObject inputObject = new AvroTestObject();
                        inputObject.setBaseValue(i);
                        MessageId messageId = producer.send((Object)inputObject);
                        log.info("test-avro-schema messageId: {}", (Object)messageId.toString());
                        expectedSet.add(function.process(inputObject, null));
                        log.info("test-avro-schema expectedSet size: {}", (Object)expectedSet.size());
                    }
                    this.getFunctionStatus(functionName, 10, false);
                    log.info("test-avro-schema producer send message finish");
                    CompletableFuture outputSchemaFuture2 = ((PulsarClientImpl)pulsarClient).getSchema(outputTopic);
                    outputSchemaFuture2.whenComplete((schemaInfo, throwable) -> {
                        if (throwable != null) {
                            log.error("get output schemaInfo error", throwable);
                            throwable.printStackTrace();
                            return;
                        }
                        if (schemaInfo.isPresent()) {
                            log.info("outputSchemaInfo: {}", (Object)((SchemaInfo)schemaInfo.get()).toString());
                        } else {
                            log.error("output schema is not present!");
                        }
                    });
                    log.info("test-avro-schema consumer connected: " + consumer.isConnected());
                    for (int i = 0; i < 10; ++i) {
                        log.info("test-avro-schema consumer receive [{}] start", (Object)i);
                        Message message = consumer.receive();
                        log.info("test-avro-schema consumer receive [{}] over", (Object)i);
                        AvroTestObject outputObject = (AvroTestObject)message.getValue();
                        Assert.assertTrue((boolean)expectedSet.contains(outputObject));
                        expectedSet.remove(outputObject);
                        consumer.acknowledge(message);
                    }
                    log.info("test-avro-schema consumer receive message finish");
                    Assert.assertEquals((int)expectedSet.size(), (int)0);
                    this.deleteFunction(functionName);
                    this.getFunctionInfoNotFound(functionName);
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    protected void testLoggingFunction(CommandGenerator.Runtime runtime) throws Exception {
        if (this.functionRuntimeType == FunctionRuntimeType.THREAD && runtime == CommandGenerator.Runtime.PYTHON) {
            return;
        }
        if (this.functionRuntimeType == FunctionRuntimeType.THREAD && runtime == CommandGenerator.Runtime.GO) {
            return;
        }
        Schema schema = CommandGenerator.Runtime.JAVA == runtime ? Schema.STRING : Schema.BYTES;
        String inputTopicName = "persistent://public/default/test-log-" + (Object)((Object)runtime) + "-input-" + PulsarFunctionsTest.randomName(8);
        String logTopicName = "test-log-" + (Object)((Object)runtime) + "-log-topic-" + PulsarFunctionsTest.randomName(8);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
            admin.topics().createNonPartitionedTopic(inputTopicName);
            admin.topics().createNonPartitionedTopic(logTopicName);
        }
        String functionName = "test-logging-fn-" + PulsarFunctionsTest.randomName(8);
        int numMessages = 10;
        this.submitJavaLoggingFunction(inputTopicName, logTopicName, functionName, schema);
        this.getFunctionInfoSuccess(functionName);
        this.getFunctionStatsEmpty(functionName);
        this.publishAndConsumeMessages(inputTopicName, logTopicName, 10, "-log");
        this.getFunctionStatus(functionName, 10, true);
        this.getFunctionStats(functionName, 10);
        this.deleteFunction(functionName);
        this.getFunctionInfoNotFound(functionName);
        this.checkSubscriptionsCleanup(inputTopicName);
        this.checkPublisherCleanup(logTopicName);
    }

    private void submitJavaLoggingFunction(String inputTopicName, String logTopicName, String functionName, Schema<?> schema) throws Exception {
        CommandGenerator generator;
        log.info("------- INPUT TOPIC: '{}'", (Object)inputTopicName);
        if (inputTopicName.endsWith(".*")) {
            log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, "org.apache.pulsar.functions.api.examples.LoggingFunction");
        } else {
            log.info("----- CREATING REGULAR FUNCTION --- ");
            generator = CommandGenerator.createDefaultGenerator(inputTopicName, "org.apache.pulsar.functions.api.examples.LoggingFunction");
        }
        generator.setLogTopic(logTopicName);
        generator.setFunctionName(functionName);
        String command = generator.generateCreateFunctionCommand();
        log.info("---------- Function command: {}", (Object)command);
        String[] commands = new String[]{"sh", "-c", command};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""));
        this.ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishAndConsumeMessages(String inputTopic, String outputTopic, int numMessages, String messagePostfix) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer().topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
            try {
                Producer producer = client.newProducer(Schema.STRING).topic(inputTopic).create();
                try {
                    int i;
                    for (int i2 = 0; i2 < numMessages; ++i2) {
                        producer.send((Object)("message-" + i2));
                    }
                    HashSet<String> expectedMessages = new HashSet<String>();
                    for (i = 0; i < numMessages; ++i) {
                        expectedMessages.add("message-" + i + messagePostfix);
                    }
                    for (i = 0; i < numMessages; ++i) {
                        Message msg = consumer.receive(30, TimeUnit.SECONDS);
                        String logMsg = new String((byte[])msg.getValue(), StandardCharsets.UTF_8);
                        log.info("Received message: '{}'", (Object)logMsg);
                        Assert.assertTrue((boolean)expectedMessages.contains(logMsg), (String)("Message '" + logMsg + "' not expected"));
                        expectedMessages.remove(logMsg);
                    }
                    consumer.close();
                    producer.close();
                    client.close();
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testMergeFunction() throws Exception {
        log.info("start merge function test ...");
        String ns = "public/ns-merge-" + PulsarFunctionsTest.randomName(8);
        PulsarAdmin pulsarAdmin = this.getPulsarAdmin();
        try {
            pulsarAdmin.namespaces().createNamespace(ns);
            pulsarAdmin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
            SchemaCompatibilityStrategy strategy = pulsarAdmin.namespaces().getSchemaCompatibilityStrategy(ns);
            log.info("namespace {} SchemaCompatibilityStrategy is {}", (Object)ns, (Object)strategy);
            PulsarClient pulsarClient = this.getPulsarClient();
            try {
                ObjectNode inputSpecNode = this.objectMapper.createObjectNode();
                ConcurrentHashMap<String, AtomicInteger> topicMsgCntMap = new ConcurrentHashMap<String, AtomicInteger>();
                int messagePerTopic = 10;
                this.prepareDataForMergeFunction(ns, pulsarClient, inputSpecNode, messagePerTopic, topicMsgCntMap);
                String outputTopic = ns + "/test-merge-output";
                Consumer consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test-merge-fn").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).topic(new String[]{outputTopic}).subscribe();
                try {
                    Message message;
                    String functionName = "test-merge-fn-" + PulsarFunctionsTest.randomName(8);
                    this.submitFunction(CommandGenerator.Runtime.JAVA, "", outputTopic, functionName, null, MergeTopicFunction.class.getName(), null, null, inputSpecNode.toString(), SchemaType.AUTO_PUBLISH.name().toUpperCase(), SubscriptionInitialPosition.Earliest);
                    this.getFunctionInfoSuccess(functionName);
                    this.getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * messagePerTopic, true);
                    do {
                        if ((message = consumer.receive(30, TimeUnit.SECONDS)) == null) continue;
                        String baseTopic = message.getProperty("baseTopic");
                        org.apache.pulsar.client.api.schema.GenericRecord genericRecord = (org.apache.pulsar.client.api.schema.GenericRecord)message.getValue();
                        log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}", new Object[]{baseTopic, genericRecord.getSchemaType(), genericRecord.getNativeObject().getClass(), genericRecord.getNativeObject()});
                        this.checkSchemaForAutoSchema((Message<org.apache.pulsar.client.api.schema.GenericRecord>)message, baseTopic);
                        ((AtomicInteger)topicMsgCntMap.get(baseTopic)).decrementAndGet();
                        consumer.acknowledge(message);
                    } while (message != null);
                    for (Map.Entry entry : topicMsgCntMap.entrySet()) {
                        Assert.assertEquals((int)((AtomicInteger)entry.getValue()).get(), (int)0, (String)("topic " + (String)entry.getKey() + " left message cnt is not 0."));
                    }
                    this.deleteFunction(functionName);
                    this.getFunctionInfoNotFound(functionName);
                    log.info("finish merge function test.");
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(pulsarClient).get(0) != null) {
                    pulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarAdmin).get(0) != null) {
                pulsarAdmin.close();
            }
        }
    }

    private void prepareDataForMergeFunction(String ns, PulsarClient pulsarClient, ObjectNode inputSpecNode, int messagePerTopic, Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
        this.generateDataByDifferentSchema(ns, "merge-schema-bytes", pulsarClient, Schema.BYTES, "bytes schema test".getBytes(), messagePerTopic, inputSpecNode, topicMsgCntMap);
        this.generateDataByDifferentSchema(ns, "merge-schema-string", pulsarClient, Schema.STRING, "string schema test", messagePerTopic, inputSpecNode, topicMsgCntMap);
        this.generateDataByDifferentSchema(ns, "merge-schema-json-userv1", pulsarClient, Schema.JSON(Users.UserV1.class), new Users.UserV1("ran", Integer.valueOf(33)), messagePerTopic, inputSpecNode, topicMsgCntMap);
        this.generateDataByDifferentSchema(ns, "merge-schema-json-userv2", pulsarClient, Schema.JSON(Users.UserV2.class), new Users.UserV2("tang", Integer.valueOf(18), "123123123"), messagePerTopic, inputSpecNode, topicMsgCntMap);
        this.generateDataByDifferentSchema(ns, "merge-schema-avro-userv2", pulsarClient, Schema.AVRO(Users.UserV2.class), new Users.UserV2("tang", Integer.valueOf(20), "456456456"), messagePerTopic, inputSpecNode, topicMsgCntMap);
        this.generateDataByDifferentSchema(ns, "merge-schema-k-int-v-json-userv1-separate", pulsarClient, Schema.KeyValue((Schema)Schema.INT32, (Schema)Schema.JSON(Users.UserV1.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED), new KeyValue((Object)100, (Object)new Users.UserV1("ran", Integer.valueOf(40))), messagePerTopic, inputSpecNode, topicMsgCntMap);
        this.generateDataByDifferentSchema(ns, "merge-schema-k-json-userv2-v-json-userv1-inline", pulsarClient, Schema.KeyValue((Schema)Schema.JSON(Users.UserV2.class), (Schema)Schema.JSON(Users.UserV1.class), (KeyValueEncodingType)KeyValueEncodingType.INLINE), new KeyValue((Object)new Users.UserV2("tang", Integer.valueOf(20), "789789789"), (Object)new Users.UserV1("ran", Integer.valueOf(40))), messagePerTopic, inputSpecNode, topicMsgCntMap);
    }

    private void generateDataByDifferentSchema(String ns, String baseTopic, PulsarClient pulsarClient, Schema schema, Object data, int messageCnt, ObjectNode inputSpecNode, Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
        String topic = ns + "/" + baseTopic;
        Producer producer = pulsarClient.newProducer(schema).topic(topic).create();
        for (int i = 0; i < messageCnt; ++i) {
            producer.newMessage().value(data).property("baseTopic", baseTopic).send();
        }
        ObjectNode confNode = this.objectMapper.createObjectNode();
        confNode.put("schemaType", SchemaType.AUTO_CONSUME.name().toUpperCase());
        inputSpecNode.put(topic, confNode.toString());
        topicMsgCntMap.put(baseTopic, new AtomicInteger(messageCnt));
        producer.close();
        log.info("[merge-fn] generate {} messages for schema {}", (Object)messageCnt, (Object)schema.getSchemaInfo());
    }

    private void checkSchemaForAutoSchema(Message<org.apache.pulsar.client.api.schema.GenericRecord> message, String baseTopic) {
        if (!message.getReaderSchema().isPresent()) {
            Assert.fail((String)"Failed to get reader schema for auto consume multiple schema topic.");
        }
        Object nativeObject = ((org.apache.pulsar.client.api.schema.GenericRecord)message.getValue()).getNativeObject();
        switch (baseTopic) {
            case "merge-schema-bytes": {
                Assert.assertEquals((String)new String((byte[])nativeObject), (String)"bytes schema test");
                break;
            }
            case "merge-schema-string": {
                Assert.assertEquals((String)((String)nativeObject), (String)"string schema test");
                break;
            }
            case "merge-schema-json-userv1": {
                JsonNode jsonNode = (JsonNode)nativeObject;
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"ran");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)33);
                break;
            }
            case "merge-schema-json-userv2": {
                JsonNode jsonNode = (JsonNode)nativeObject;
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"tang");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)18);
                Assert.assertEquals((String)jsonNode.get("phone").textValue(), (String)"123123123");
                break;
            }
            case "merge-schema-avro-userv2": {
                GenericRecord genericRecord = (GenericRecord)nativeObject;
                Assert.assertEquals((String)genericRecord.get("name").toString(), (String)"tang");
                Assert.assertEquals((Object)genericRecord.get("age"), (Object)20);
                Assert.assertEquals((String)genericRecord.get("phone").toString(), (String)"456456456");
                break;
            }
            case "merge-schema-k-int-v-json-userv1-separate": {
                KeyValue kv = (KeyValue)nativeObject;
                Assert.assertEquals((Object)kv.getKey(), (Object)100);
                JsonNode jsonNode = ((GenericJsonRecord)kv.getValue()).getJsonNode();
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"ran");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)40);
                break;
            }
            case "merge-schema-k-json-userv2-v-json-userv1-inline": {
                KeyValue kv = (KeyValue)nativeObject;
                JsonNode jsonNode = ((GenericJsonRecord)kv.getKey()).getJsonNode();
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"tang");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)20);
                Assert.assertEquals((String)jsonNode.get("phone").textValue(), (String)"789789789");
                jsonNode = ((GenericJsonRecord)kv.getValue()).getJsonNode();
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"ran");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)40);
                break;
            }
        }
    }

    private PulsarClient getPulsarClient() throws PulsarClientException {
        return PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
    }

    private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
        return PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
    }
}

