package org.apache.pulsar.io;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.net.HttpHeaders;
import com.sun.net.httpserver.HttpServer;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javassist.bytecode.Opcode;
import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/io/PulsarFunctionE2ETest.class */
public class PulsarFunctionE2ETest {
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    URL urlTls;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    WorkerService functionsWorkerService;
    String primaryHost;
    String workerId;
    private Thread fileServerThread;
    private HttpServer fileServer;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarFunctionE2ETest.class);
    private static final int fileServerPort = PortManager.nextFreePort();
    final String tenant = "external-repl-prop";
    String pulsarFunctionsNamespace = "external-repl-prop/use/pulsar-function-admin";
    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
    private final int brokerWebServicePort = PortManager.nextFreePort();
    private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
    private final int brokerServicePort = PortManager.nextFreePort();
    private final int brokerServiceTlsPort = PortManager.nextFreePort();
    private final int workerServicePort = PortManager.nextFreePort();
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/io/PulsarFunctionE2ETest$Metric.class */
    public static class Metric {
        final Map<String, String> tags = new TreeMap();
        double value;

        Metric() {
        }

        public String toString() {
            return "PulsarFunctionE2ETest.Metric(tags=" + this.tags + ", value=" + this.value + ")";
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "validRoleName")
    public Object[][] validRoleName() {
        return new Object[]{new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE}};
    }

    @BeforeMethod
    void setup(Method method) throws Exception {
        for (File file : new File(System.getProperty("java.io.tmpdir")).listFiles(new FilenameFilter() { // from class: org.apache.pulsar.io.PulsarFunctionE2ETest.1
            @Override // java.io.FilenameFilter
            public boolean accept(File file2, String str) {
                return str.startsWith("function");
            }
        })) {
            file.delete();
        }
        log.info("--- Setting up method {} ---", method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, this.ZOOKEEPER_PORT, () -> {
            return Integer.valueOf(PortManager.nextFreePort());
        });
        this.bkEnsemble.start();
        String str = "https://127.0.0.1:" + this.brokerWebServiceTlsPort;
        this.config = (ServiceConfiguration) Mockito.spy(new ServiceConfiguration());
        this.config.setClusterName("use");
        this.config.setSuperUserRoles(Sets.newHashSet("superUser"));
        this.config.setWebServicePort(Optional.ofNullable(Integer.valueOf(this.brokerWebServicePort)));
        this.config.setWebServicePortTls(Optional.ofNullable(Integer.valueOf(this.brokerWebServiceTlsPort)));
        this.config.setZookeeperServers("127.0.0.1:" + this.ZOOKEEPER_PORT);
        this.config.setBrokerServicePort(Optional.ofNullable(Integer.valueOf(this.brokerServicePort)));
        this.config.setBrokerServicePortTls(Optional.ofNullable(Integer.valueOf(this.brokerServiceTlsPort)));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        HashSet hashSet = new HashSet();
        hashSet.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthenticationProviders(hashSet);
        this.config.setAuthorizationEnabled(true);
        this.config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
        this.config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.config.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.config.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        this.config.setBrokerClientTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientTlsEnabled(true);
        this.functionsWorkerService = createPulsarFunctionWorker(this.config);
        this.urlTls = new URL(str);
        this.pulsar = new PulsarService(this.config, Optional.of(this.functionsWorkerService));
        this.pulsar.start();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        hashMap.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authenticationTls = new AuthenticationTls();
        authenticationTls.configure(hashMap);
        this.admin = (PulsarAdmin) Mockito.spy(PulsarAdmin.builder().serviceHttpUrl(str).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).authentication(authenticationTls).build());
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = String.format("http://%s:%d", "localhost", Integer.valueOf(this.brokerWebServicePort));
        this.admin.clusters().updateCluster(this.config.getClusterName(), new ClusterData(this.urlTls.toString()));
        ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
        if (StringUtils.isNotBlank(this.workerConfig.getClientAuthenticationPlugin()) && StringUtils.isNotBlank(this.workerConfig.getClientAuthenticationParameters())) {
            serviceUrl.enableTls(this.workerConfig.isUseTls());
            serviceUrl.allowTlsInsecureConnection(this.workerConfig.isTlsAllowInsecureConnection());
            serviceUrl.authentication(this.workerConfig.getClientAuthenticationPlugin(), this.workerConfig.getClientAuthenticationParameters());
        }
        this.pulsarClient = serviceUrl.build();
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.getAdminRoles().add("superUser");
        tenantInfo.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
        this.admin.tenants().updateTenant("external-repl-prop", tenantInfo);
        this.fileServerThread = new Thread(() -> {
            try {
                this.fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
                this.fileServer.createContext("/pulsar-io-data-generator.nar", httpExchange -> {
                    try {
                        httpExchange.getResponseHeaders().add(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
                        File file2 = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
                        byte[] bArr = new byte[(int) file2.length()];
                        new BufferedInputStream(new FileInputStream(file2)).read(bArr, 0, bArr.length);
                        httpExchange.sendResponseHeaders(Opcode.GOTO_W, file2.length());
                        OutputStream responseBody = httpExchange.getResponseBody();
                        responseBody.write(bArr, 0, bArr.length);
                        responseBody.close();
                    } catch (Exception e) {
                        log.error("Error when downloading: {}", e, e);
                    }
                });
                this.fileServer.createContext("/pulsar-functions-api-examples.jar", httpExchange2 -> {
                    try {
                        httpExchange2.getResponseHeaders().add(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
                        File file2 = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
                        byte[] bArr = new byte[(int) file2.length()];
                        new BufferedInputStream(new FileInputStream(file2)).read(bArr, 0, bArr.length);
                        httpExchange2.sendResponseHeaders(Opcode.GOTO_W, file2.length());
                        OutputStream responseBody = httpExchange2.getResponseBody();
                        responseBody.write(bArr, 0, bArr.length);
                        responseBody.close();
                    } catch (Exception e) {
                        log.error("Error when downloading: {}", e, e);
                    }
                });
                this.fileServer.setExecutor((Executor) null);
                log.info("Starting file server...");
                this.fileServer.start();
            } catch (Exception e) {
                log.error("Failed to start file server: ", (Throwable) e);
                this.fileServer.stop(0);
            }
        });
        this.fileServerThread.start();
    }

    @AfterMethod
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.fileServer.stop(0);
        this.fileServerThread.interrupt();
        this.pulsarClient.close();
        this.admin.close();
        this.functionsWorkerService.stop();
        this.pulsar.close();
        this.bkEnsemble.stop();
    }

    private WorkerService createPulsarFunctionWorker(ServiceConfiguration serviceConfiguration) {
        System.setProperty("pulsar.functions.java.instance.jar", FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        this.workerConfig = new WorkerConfig();
        this.workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
        this.workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + serviceConfiguration.getBrokerServicePortTls().get());
        this.workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + serviceConfiguration.getWebServicePortTls().get());
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(Integer.valueOf(this.workerServicePort));
        this.workerConfig.setPulsarFunctionsCluster(serviceConfiguration.getClusterName());
        String defaultOrConfiguredAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getAdvertisedAddress());
        this.workerId = "c-" + serviceConfiguration.getClusterName() + "-fw-" + defaultOrConfiguredAddress + "-" + this.workerConfig.getWorkerPort();
        this.workerConfig.setWorkerHostname(defaultOrConfiguredAddress);
        this.workerConfig.setWorkerId(this.workerId);
        this.workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.workerConfig.setClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem"));
        this.workerConfig.setUseTls(true);
        this.workerConfig.setTlsAllowInsecureConnection(true);
        this.workerConfig.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.workerConfig.setAuthenticationEnabled(true);
        this.workerConfig.setAuthorizationEnabled(true);
        return new WorkerService(this.workerConfig);
    }

    protected static FunctionConfig createFunctionConfig(String str, String str2, String str3, String str4, String str5, String str6) {
        String format = String.format("persistent://%s/%s/%s", str, str2, str4);
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(str);
        functionConfig.setNamespace(str2);
        functionConfig.setName(str3);
        functionConfig.setParallelism(1);
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(str6);
        functionConfig.setTopicsPattern(format);
        functionConfig.setAutoAck(true);
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput(str5);
        functionConfig.setCleanupSubscription(true);
        return functionConfig;
    }

    private static SourceConfig createSourceConfig(String str, String str2, String str3, String str4) {
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(str);
        sourceConfig.setNamespace(str2);
        sourceConfig.setName(str3);
        sourceConfig.setParallelism(1);
        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sourceConfig.setTopicName(str4);
        return sourceConfig;
    }

    private static SinkConfig createSinkConfig(String str, String str2, String str3, String str4, String str5) {
        SinkConfig sinkConfig = new SinkConfig();
        sinkConfig.setTenant(str);
        sinkConfig.setNamespace(str2);
        sinkConfig.setName(str3);
        sinkConfig.setParallelism(1);
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setInputSpecs(Collections.singletonMap(str4, ConsumerConfig.builder().build()));
        sinkConfig.setSourceSubscriptionName(str5);
        sinkConfig.setCleanupSubscription(true);
        return sinkConfig;
    }

    private void testE2EPulsarFunction(String str) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://external-repl-prop/io/output2").subscriptionName("sub").subscribe();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        createFunctionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        this.admin.functions().createFunctionWithUrl(createFunctionConfig, str);
        createFunctionConfig.setParallelism(2);
        createFunctionConfig.setOutput("persistent://external-repl-prop/io/output2");
        this.admin.functions().updateFunctionWithUrl(createFunctionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r8 -> {
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
                if (stats.publishers.size() == 2 && ((PublisherStats) stats.publishers.get(0)).metadata != null && ((PublisherStats) stats.publishers.get(0)).metadata.containsKey("id")) {
                    if (((String) ((PublisherStats) stats.publishers.get(0)).metadata.get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"))) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
        Assert.assertEquals(stats.publishers.size(), 2);
        Assert.assertTrue(((PublisherStats) stats.publishers.get(0)).metadata != null);
        Assert.assertTrue(((PublisherStats) stats.publishers.get(0)).metadata.containsKey("id"));
        Assert.assertEquals((String) ((PublisherStats) stats.publishers.get(0)).metadata.get("id"), String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 1);
        for (int i = 0; i < 5; i++) {
            create.newMessage().property(TypedMessageBuilder.CONF_KEY, "value").value("my-message-" + i).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
            try {
                return ((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.get("test-sub")).unackedMessages == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals("value", subscribe.receive(5, TimeUnit.SECONDS).getProperty(TypedMessageBuilder.CONF_KEY));
        Assert.assertNotEquals(Long.valueOf(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.values().iterator().next()).unackedMessages), 5);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 0);
        File[] listFiles = new File(System.getProperty("java.io.tmpdir")).listFiles(new FilenameFilter() { // from class: org.apache.pulsar.io.PulsarFunctionE2ETest.2
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str2) {
                return str2.startsWith("function");
            }
        });
        Assert.assertEquals(listFiles.length, 0, "Temporary files left over: " + Arrays.asList(listFiles));
    }

    @Test(timeOut = 20000)
    public void testE2EPulsarFunctionWithFile() throws Exception {
        testE2EPulsarFunction(Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
    }

    @Test(timeOut = 40000)
    public void testE2EPulsarFunctionWithUrl() throws Exception {
        testE2EPulsarFunction(String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", Integer.valueOf(fileServerPort)));
    }

    private void testPulsarSinkStats(String str) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig createSinkConfig = createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        createSinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(1000).build()));
        this.admin.sink().createSinkWithUrl(createSinkConfig, str);
        createSinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(523).build()));
        this.admin.sink().updateSinkWithUrl(createSinkConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                if (stats.subscriptions.containsKey("test-sub") && ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.size() == 1) {
                    if (((ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.get(0)).availablePermits == 523) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
        Assert.assertEquals(stats.subscriptions.size(), 1);
        Assert.assertTrue(stats.subscriptions.containsKey("test-sub"));
        Assert.assertEquals(((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.size(), 1);
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.get(0)).availablePermits, 523);
        String prometheusMetrics = getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheus metrics: {}", prometheusMetrics);
        Map<String, Metric> parseMetrics = parseMetrics(prometheusMetrics);
        Metric metric = parseMetrics.get("pulsar_sink_received_total");
        Assert.assertEquals(metric.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric.tags.get("instance_id"), "0");
        Assert.assertEquals(metric.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric2 = parseMetrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals(metric2.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric2.tags.get("instance_id"), "0");
        Assert.assertEquals(metric2.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric2.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric2.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric2.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric3 = parseMetrics.get("pulsar_sink_written_total");
        Assert.assertEquals(metric3.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric3.tags.get("instance_id"), "0");
        Assert.assertEquals(metric3.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric3.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric3.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric3.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric4 = parseMetrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals(metric4.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric4.tags.get("instance_id"), "0");
        Assert.assertEquals(metric4.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric4.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric4.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric4.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric5 = parseMetrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals(metric5.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric5.tags.get("instance_id"), "0");
        Assert.assertEquals(metric5.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric5.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric5.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric5.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric6 = parseMetrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals(metric6.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric6.tags.get("instance_id"), "0");
        Assert.assertEquals(metric6.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric6.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric6.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric6.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric7 = parseMetrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals(metric7.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric7.tags.get("instance_id"), "0");
        Assert.assertEquals(metric7.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric7.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric7.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric7.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric8 = parseMetrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals(metric8.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric8.tags.get("instance_id"), "0");
        Assert.assertEquals(metric8.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric8.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric8.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric8.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric9 = parseMetrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals(metric9.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric9.tags.get("instance_id"), "0");
        Assert.assertEquals(metric9.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric9.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric9.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric9.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property(TypedMessageBuilder.CONF_KEY, "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/input").subscriptions.get("test-sub");
                if (subscriptionStats.unackedMessages == 0) {
                    if (subscriptionStats.msgThroughputOut == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        String prometheusMetrics2 = getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheusMetrics: {}", prometheusMetrics2);
        Map<String, Metric> parseMetrics2 = parseMetrics(prometheusMetrics2);
        Metric metric10 = parseMetrics2.get("pulsar_sink_received_total");
        Assert.assertEquals(metric10.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric10.tags.get("instance_id"), "0");
        Assert.assertEquals(metric10.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric10.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric10.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric10.value), Double.valueOf(10));
        Metric metric11 = parseMetrics2.get("pulsar_sink_received_total_1min");
        Assert.assertEquals(metric11.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric11.tags.get("instance_id"), "0");
        Assert.assertEquals(metric11.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric11.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric11.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric11.value), Double.valueOf(10));
        Metric metric12 = parseMetrics2.get("pulsar_sink_written_total");
        Assert.assertEquals(metric12.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric12.tags.get("instance_id"), "0");
        Assert.assertEquals(metric12.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric12.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric12.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric12.value), Double.valueOf(10));
        Metric metric13 = parseMetrics2.get("pulsar_sink_written_total_1min");
        Assert.assertEquals(metric13.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric13.tags.get("instance_id"), "0");
        Assert.assertEquals(metric13.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric13.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric13.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric13.value), Double.valueOf(10));
        Metric metric14 = parseMetrics2.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals(metric14.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric14.tags.get("instance_id"), "0");
        Assert.assertEquals(metric14.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric14.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric14.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric14.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric15 = parseMetrics2.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals(metric15.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric15.tags.get("instance_id"), "0");
        Assert.assertEquals(metric15.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric15.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric15.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric15.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric16 = parseMetrics2.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals(metric16.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric16.tags.get("instance_id"), "0");
        Assert.assertEquals(metric16.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric16.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric16.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric16.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric17 = parseMetrics2.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals(metric17.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric17.tags.get("instance_id"), "0");
        Assert.assertEquals(metric17.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric17.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric17.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric17.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric18 = parseMetrics2.get("pulsar_sink_last_invocation");
        Assert.assertEquals(metric18.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric18.tags.get("instance_id"), "0");
        Assert.assertEquals(metric18.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric18.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric18.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric18.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        this.admin.sink().deleteSink("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/input").subscriptions.size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/input").subscriptions.size(), 0);
        File[] listFiles = new File(System.getProperty("java.io.tmpdir")).listFiles(new FilenameFilter() { // from class: org.apache.pulsar.io.PulsarFunctionE2ETest.3
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str2) {
                return str2.startsWith("function");
            }
        });
        Assert.assertEquals(listFiles.length, 0, "Temporary files left over: " + Arrays.asList(listFiles));
    }

    @Test(timeOut = 20000)
    public void testPulsarSinkStatsWithFile() throws Exception {
        testPulsarSinkStats(Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
    }

    @Test(timeOut = 40000)
    public void testPulsarSinkStatsWithUrl() throws Exception {
        testPulsarSinkStats(String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", Integer.valueOf(fileServerPort)));
    }

    private void testPulsarSourceStats(String str) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        SourceConfig createSourceConfig = createSourceConfig("external-repl-prop", "io", "PulsarSource-test", "persistent://external-repl-prop/io/output");
        this.admin.source().createSourceWithUrl(createSourceConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/output").publishers.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 150L);
        createSourceConfig.setTopicName("persistent://external-repl-prop/io/output2");
        this.admin.source().updateSourceWithUrl(createSourceConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r8 -> {
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
                if (stats.publishers.size() == 1 && ((PublisherStats) stats.publishers.get(0)).metadata != null && ((PublisherStats) stats.publishers.get(0)).metadata.containsKey("id")) {
                    if (((String) ((PublisherStats) stats.publishers.get(0)).metadata.get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"))) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
        Assert.assertEquals(stats.publishers.size(), 1);
        Assert.assertTrue(((PublisherStats) stats.publishers.get(0)).metadata != null);
        Assert.assertTrue(((PublisherStats) stats.publishers.get(0)).metadata.containsKey("id"));
        Assert.assertEquals((String) ((PublisherStats) stats.publishers.get(0)).metadata.get("id"), String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"));
        MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
            try {
                if (this.admin.topics().getStats("persistent://external-repl-prop/io/output2").publishers.size() == 1) {
                    if (this.admin.topics().getInternalStats("persistent://external-repl-prop/io/output2").numberOfEntries > 4) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/output2").publishers.size(), 1);
        String prometheusMetrics = getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheusMetrics: {}", prometheusMetrics);
        Map<String, Metric> parseMetrics = parseMetrics(prometheusMetrics);
        Metric metric = parseMetrics.get("pulsar_source_received_total");
        Assert.assertEquals(metric.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric.tags.get("instance_id"), "0");
        Assert.assertEquals(metric.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric2 = parseMetrics.get("pulsar_source_received_total_1min");
        Assert.assertEquals(metric2.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric2.tags.get("instance_id"), "0");
        Assert.assertEquals(metric2.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric2.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric2.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric2.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric3 = parseMetrics.get("pulsar_source_written_total");
        Assert.assertEquals(metric3.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric3.tags.get("instance_id"), "0");
        Assert.assertEquals(metric3.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric3.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric3.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric3.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric4 = parseMetrics.get("pulsar_source_written_total_1min");
        Assert.assertEquals(metric4.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric4.tags.get("instance_id"), "0");
        Assert.assertEquals(metric4.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric4.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric4.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric4.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric5 = parseMetrics.get("pulsar_source_source_exceptions_total");
        Assert.assertEquals(metric5.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric5.tags.get("instance_id"), "0");
        Assert.assertEquals(metric5.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric5.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric5.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(Double.valueOf(metric5.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric6 = parseMetrics.get("pulsar_source_source_exceptions_total_1min");
        Assert.assertEquals(metric6.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric6.tags.get("instance_id"), "0");
        Assert.assertEquals(metric6.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric6.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric6.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(Double.valueOf(metric6.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric7 = parseMetrics.get("pulsar_source_system_exceptions_total");
        Assert.assertEquals(metric7.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric7.tags.get("instance_id"), "0");
        Assert.assertEquals(metric7.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric7.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric7.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(Double.valueOf(metric7.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric8 = parseMetrics.get("pulsar_source_system_exceptions_total_1min");
        Assert.assertEquals(metric8.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric8.tags.get("instance_id"), "0");
        Assert.assertEquals(metric8.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric8.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric8.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(Double.valueOf(metric8.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric9 = parseMetrics.get("pulsar_source_last_invocation");
        Assert.assertEquals(metric9.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric9.tags.get("instance_id"), "0");
        Assert.assertEquals(metric9.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric9.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric9.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric9.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        File[] listFiles = new File(System.getProperty("java.io.tmpdir")).listFiles(new FilenameFilter() { // from class: org.apache.pulsar.io.PulsarFunctionE2ETest.4
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str2) {
                return str2.startsWith("function");
            }
        });
        Assert.assertEquals(listFiles.length, 0, "Temporary files left over: " + Arrays.asList(listFiles));
    }

    @Test(timeOut = 20000)
    public void testPulsarSourceStatsWithFile() throws Exception {
        testPulsarSourceStats(Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
    }

    @Test(timeOut = 40000)
    public void testPulsarSourceStatsWithUrl() throws Exception {
        testPulsarSourceStats(String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", Integer.valueOf(fileServerPort)));
    }

    @Test(timeOut = 20000)
    public void testPulsarFunctionStats() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String str = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(createFunctionConfig, str);
        this.admin.functions().updateFunctionWithUrl(createFunctionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 1);
        FunctionRuntimeManager functionRuntimeManager = this.functionsWorkerService.getFunctionRuntimeManager();
        FunctionStats functionStats = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", (URI) null);
        Assert.assertEquals(functionStats, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(functionStats.getReceivedTotal(), 0L);
        Assert.assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(functionStats.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats.getUserExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats.avgProcessLatency, (Object) null);
        Assert.assertEquals(functionStats.oneMin.getReceivedTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getAvgProcessLatency(), (Object) null);
        Assert.assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
        Assert.assertEquals(functionStats.getLastInvocation(), (Object) null);
        Assert.assertEquals(functionStats.instances.size(), 1);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getInstanceId(), 0);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().getReceivedTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().avgProcessLatency, (Object) null);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().oneMin.getReceivedTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().oneMin.getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().oneMin.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().oneMin.getUserExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().oneMin.getAvgProcessLatency(), (Object) null);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), ((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().oneMin.getAvgProcessLatency());
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());
        String prometheusMetrics = getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheus metrics: {}", prometheusMetrics);
        Map<String, Metric> parseMetrics = parseMetrics(prometheusMetrics);
        Metric metric = parseMetrics.get("pulsar_function_received_total");
        Assert.assertEquals(metric.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric.tags.get("instance_id"), "0");
        Assert.assertEquals(metric.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric2 = parseMetrics.get("pulsar_function_received_total_1min");
        Assert.assertEquals(metric2.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric2.tags.get("instance_id"), "0");
        Assert.assertEquals(metric2.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric2.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric2.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric2.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric3 = parseMetrics.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals(metric3.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric3.tags.get("instance_id"), "0");
        Assert.assertEquals(metric3.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric3.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric3.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric3.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric4 = parseMetrics.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals(metric4.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric4.tags.get("instance_id"), "0");
        Assert.assertEquals(metric4.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric4.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric4.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric4.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric5 = parseMetrics.get("pulsar_function_process_latency_ms");
        Assert.assertEquals(metric5.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric5.tags.get("instance_id"), "0");
        Assert.assertEquals(metric5.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric5.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric5.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric5.value), Double.valueOf(Double.NaN));
        Metric metric6 = parseMetrics.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals(metric6.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric6.tags.get("instance_id"), "0");
        Assert.assertEquals(metric6.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric6.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric6.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric6.value), Double.valueOf(Double.NaN));
        Metric metric7 = parseMetrics.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals(metric7.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric7.tags.get("instance_id"), "0");
        Assert.assertEquals(metric7.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric7.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric7.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric7.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric8 = parseMetrics.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals(metric8.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric8.tags.get("instance_id"), "0");
        Assert.assertEquals(metric8.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric8.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric8.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric8.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric9 = parseMetrics.get("pulsar_function_last_invocation");
        Assert.assertEquals(metric9.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric9.tags.get("instance_id"), "0");
        Assert.assertEquals(metric9.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric9.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric9.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric9.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric10 = parseMetrics.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals(metric10.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric10.tags.get("instance_id"), "0");
        Assert.assertEquals(metric10.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric10.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric10.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric10.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric11 = parseMetrics.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals(metric11.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric11.tags.get("instance_id"), "0");
        Assert.assertEquals(metric11.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric11.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric11.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric11.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, (URI) null);
        Assert.assertEquals(functionInstanceStats, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0));
        Assert.assertEquals(functionInstanceStats, ((FunctionStats.FunctionInstanceStats) functionStats.instances.get(0)).getMetrics());
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property(TypedMessageBuilder.CONF_KEY, "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.get("test-sub");
                if (subscriptionStats.unackedMessages == 0) {
                    if (subscriptionStats.msgThroughputOut == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStats functionStats2 = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", (URI) null);
        Assert.assertEquals(functionStats2, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(functionStats2.getReceivedTotal(), 10);
        Assert.assertEquals(functionStats2.getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(functionStats2.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats2.getUserExceptionsTotal(), 0L);
        Assert.assertTrue(functionStats2.avgProcessLatency.doubleValue() > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(functionStats2.oneMin.getReceivedTotal(), 10);
        Assert.assertEquals(functionStats2.oneMin.getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(functionStats2.oneMin.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats2.oneMin.getUserExceptionsTotal(), 0L);
        Assert.assertTrue(functionStats2.oneMin.getAvgProcessLatency().doubleValue() > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(functionStats2.getAvgProcessLatency(), functionStats2.oneMin.getAvgProcessLatency());
        Assert.assertTrue(functionStats2.getLastInvocation().longValue() > 0);
        Assert.assertEquals(functionStats2.instances.size(), 1);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getInstanceId(), 0);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().getReceivedTotal(), 10);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().getUserExceptionsTotal(), 0L);
        Assert.assertTrue(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().avgProcessLatency.doubleValue() > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().oneMin.getReceivedTotal(), 10);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().oneMin.getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().oneMin.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().oneMin.getUserExceptionsTotal(), 0L);
        Assert.assertTrue(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().oneMin.getAvgProcessLatency().doubleValue() > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().getAvgProcessLatency(), ((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().oneMin.getAvgProcessLatency());
        Assert.assertEquals(((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics().getAvgProcessLatency(), functionStats2.getAvgProcessLatency());
        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStats2 = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, (URI) null);
        Assert.assertEquals(functionInstanceStats2, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0));
        Assert.assertEquals(functionInstanceStats2, ((FunctionStats.FunctionInstanceStats) functionStats2.instances.get(0)).getMetrics());
        String prometheusMetrics2 = getPrometheusMetrics(this.brokerWebServicePort);
        log.info("prometheus metrics: {}", prometheusMetrics2);
        Map<String, Metric> parseMetrics2 = parseMetrics(prometheusMetrics2);
        Metric metric12 = parseMetrics2.get("pulsar_function_received_total");
        Assert.assertEquals(metric12.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric12.tags.get("instance_id"), "0");
        Assert.assertEquals(metric12.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric12.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric12.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric12.value), Double.valueOf(10));
        Metric metric13 = parseMetrics2.get("pulsar_function_received_total_1min");
        Assert.assertEquals(metric13.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric13.tags.get("instance_id"), "0");
        Assert.assertEquals(metric13.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric13.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric13.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric13.value), Double.valueOf(10));
        Metric metric14 = parseMetrics2.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals(metric14.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric14.tags.get("instance_id"), "0");
        Assert.assertEquals(metric14.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric14.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric14.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric14.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric15 = parseMetrics2.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals(metric15.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric15.tags.get("instance_id"), "0");
        Assert.assertEquals(metric15.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric15.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric15.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric15.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric16 = parseMetrics2.get("pulsar_function_process_latency_ms");
        Assert.assertEquals(metric16.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric16.tags.get("instance_id"), "0");
        Assert.assertEquals(metric16.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric16.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric16.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric16.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric17 = parseMetrics2.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals(metric17.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric17.tags.get("instance_id"), "0");
        Assert.assertEquals(metric17.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric17.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric17.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric17.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric18 = parseMetrics2.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals(metric18.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric18.tags.get("instance_id"), "0");
        Assert.assertEquals(metric18.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric18.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric18.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric18.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric19 = parseMetrics2.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals(metric19.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric19.tags.get("instance_id"), "0");
        Assert.assertEquals(metric19.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric19.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric19.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric19.value), Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS));
        Metric metric20 = parseMetrics2.get("pulsar_function_last_invocation");
        Assert.assertEquals(metric20.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric20.tags.get("instance_id"), "0");
        Assert.assertEquals(metric20.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric20.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric20.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric20.value > CMAESOptimizer.DEFAULT_STOPFITNESS);
        Metric metric21 = parseMetrics2.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals(metric21.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric21.tags.get("instance_id"), "0");
        Assert.assertEquals(metric21.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric21.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric21.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric21.value), Double.valueOf(10));
        Metric metric22 = parseMetrics2.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals(metric22.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric22.tags.get("instance_id"), "0");
        Assert.assertEquals(metric22.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric22.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric22.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(Double.valueOf(metric22.value), Double.valueOf(10));
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 0);
        File[] listFiles = new File(System.getProperty("java.io.tmpdir")).listFiles(new FilenameFilter() { // from class: org.apache.pulsar.io.PulsarFunctionE2ETest.5
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str2) {
                return str2.startsWith("function");
            }
        });
        Assert.assertEquals(listFiles.length, 0, "Temporary files left over: " + Arrays.asList(listFiles));
    }

    @Test(timeOut = 20000)
    public void testPulsarFunctionStatus() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String str = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(createFunctionConfig, str);
        this.admin.functions().updateFunctionWithUrl(createFunctionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 1);
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property(TypedMessageBuilder.CONF_KEY, "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.get("test-sub");
                if (subscriptionStats.unackedMessages == 0) {
                    if (subscriptionStats.msgThroughputOut == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarSink-test");
        Assert.assertEquals(functionStatus.getNumInstances(), 1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus) functionStatus.getInstances().get(0)).getStatus();
        double numReceived = status.getNumReceived();
        double numSuccessfullyProcessed = status.getNumSuccessfullyProcessed();
        String workerId = status.getWorkerId();
        Assert.assertEquals((int) numReceived, 10);
        Assert.assertEquals((int) numSuccessfullyProcessed, 10);
        Assert.assertEquals(workerId, this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 0);
    }

    @Test(dataProvider = "validRoleName")
    public void testAuthorization(boolean z) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        String str = z ? "superUser" : "invalid";
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.getAdminRoles().add(str);
        tenantInfo.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
        this.admin.tenants().updateTenant("external-repl-prop", tenantInfo);
        try {
            this.admin.functions().createFunctionWithUrl(createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub"), Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
            Assert.assertTrue(z);
        } catch (PulsarAdminException.NotAuthorizedException e) {
            Assert.assertFalse(z);
        }
    }

    @Test(timeOut = 20000)
    public void testFunctionStopAndRestartApi() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/restartFunction").create();
        this.admin.functions().createFunctionWithUrl(createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "restartFunction", "persistent://external-repl-prop/io/output", "test-sub"), Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
                if (subscriptionStats != null) {
                    if (subscriptionStats.consumers.size() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub")).consumers.size(), 1);
        this.admin.functions().stopFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
                if (subscriptionStats != null) {
                    if (subscriptionStats.consumers.size() == 0) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub")).consumers.size(), 0);
        this.admin.functions().restartFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub");
                if (subscriptionStats != null) {
                    if (subscriptionStats.consumers.size() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").subscriptions.get("test-sub")).consumers.size(), 1);
        create.close();
    }

    @Test(timeOut = 20000)
    public void testFunctionAutomaticSubCleanup() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList("use")));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String str = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant("external-repl-prop");
        functionConfig.setNamespace("io");
        functionConfig.setName("PulsarFunction-test");
        functionConfig.setParallelism(1);
        functionConfig.setInputs(Collections.singleton("persistent://external-repl-prop/io/my-topic1"));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setOutput("persistent://external-repl-prop/io/output");
        functionConfig.setCleanupSubscription(false);
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        this.admin.functions().createFunctionWithUrl(functionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
            try {
                return this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue();
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertFalse(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 1);
        functionConfig.setCleanupSubscription(true);
        this.admin.functions().updateFunctionWithUrl(functionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r62 -> {
            try {
                return this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue();
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertTrue(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property(TypedMessageBuilder.CONF_KEY, "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.get(InstanceUtils.getDefaultSubscriptionName("external-repl-prop", "io", "PulsarFunction-test"));
                if (subscriptionStats.unackedMessages == 0) {
                    if (subscriptionStats.msgThroughputOut == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarFunction-test");
        Assert.assertEquals(functionStatus.getNumInstances(), 1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus) functionStatus.getInstances().get(0)).getStatus();
        double numReceived = status.getNumReceived();
        double numSuccessfullyProcessed = status.getNumSuccessfullyProcessed();
        String workerId = status.getWorkerId();
        Assert.assertEquals((int) numReceived, 10);
        Assert.assertEquals((int) numSuccessfullyProcessed, 10);
        Assert.assertEquals(workerId, this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 0);
        functionConfig.setCleanupSubscription(false);
        this.admin.functions().createFunctionWithUrl(functionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 1);
        MockedPulsarServiceBaseTest.retryStrategically(r63 -> {
            try {
                FunctionConfig function = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                if (function.getParallelism().intValue() == 2) {
                    if (!function.getCleanupSubscription().booleanValue()) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertFalse(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        functionConfig.setParallelism(2);
        this.admin.functions().updateFunctionWithUrl(functionConfig, str);
        MockedPulsarServiceBaseTest.retryStrategically(r64 -> {
            try {
                FunctionConfig function = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                if (function.getParallelism().intValue() == 2) {
                    if (!function.getCleanupSubscription().booleanValue()) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertFalse(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(r44 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").subscriptions.size(), 1);
    }

    public static String getPrometheusMetrics(int i) throws IOException {
        StringBuilder sb = new StringBuilder();
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(String.format("http://%s:%s/metrics", "localhost", Integer.valueOf(i))).openConnection();
        httpURLConnection.setRequestMethod("GET");
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream()));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                return sb.toString();
            }
            sb.append(readLine + System.lineSeparator());
        }
    }

    private static Map<String, Metric> parseMetrics(String str) {
        HashMap hashMap = new HashMap();
        Pattern compile = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
        Pattern compile2 = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
        Arrays.asList(str.split(StringUtils.LF)).forEach(str2 -> {
            if (str2.isEmpty() || str2.startsWith("#")) {
                return;
            }
            Matcher matcher = compile.matcher(str2);
            Preconditions.checkArgument(matcher.matches());
            String group = matcher.group(1);
            Metric metric = new Metric();
            metric.value = Double.valueOf(matcher.group(3)).doubleValue();
            Matcher matcher2 = compile2.matcher(matcher.group(2));
            while (matcher2.find()) {
                metric.tags.put(matcher2.group(1), matcher2.group(2));
            }
            hashMap.put(group, metric);
        });
        return hashMap;
    }
}
