package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Map;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.TestPulsarFunctionUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker-io"})
/* loaded from: input_file:org/apache/pulsar/io/PulsarSourceE2ETest.class */
public class PulsarSourceE2ETest extends AbstractPulsarE2ETest {
    private void testPulsarSourceStats(String str) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        SourceConfig createSourceConfig = createSourceConfig("external-repl-prop", "io", "PulsarSource-test", "persistent://external-repl-prop/io/output");
        if (str.startsWith("builtin")) {
            createSourceConfig.setArchive(str);
            this.admin.sources().createSource(createSourceConfig, (String) null);
        } else {
            this.admin.sources().createSourceWithUrl(createSourceConfig, str);
        }
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 150L);
        createSourceConfig.setTopicName("persistent://external-repl-prop/io/output2");
        if (str.startsWith("builtin")) {
            this.admin.sources().updateSource(createSourceConfig, (String) null);
        } else {
            this.admin.sources().updateSourceWithUrl(createSourceConfig, str);
        }
        MockedPulsarServiceBaseTest.retryStrategically(r8 -> {
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
                if (stats.getPublishers().size() == 1 && ((PublisherStats) stats.getPublishers().get(0)).getMetadata() != null && ((PublisherStats) stats.getPublishers().get(0)).getMetadata().containsKey("id")) {
                    if (((String) ((PublisherStats) stats.getPublishers().get(0)).getMetadata().get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"))) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
        Assert.assertEquals(stats.getPublishers().size(), 1);
        Assert.assertNotNull(((PublisherStats) stats.getPublishers().get(0)).getMetadata());
        Assert.assertTrue(((PublisherStats) stats.getPublishers().get(0)).getMetadata().containsKey("id"));
        Assert.assertEquals((String) ((PublisherStats) stats.getPublishers().get(0)).getMetadata().get("id"), String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"));
        MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
            try {
                if (this.admin.topics().getStats("persistent://external-repl-prop/io/output2").getPublishers().size() == 1) {
                    if (this.admin.topics().getInternalStats("persistent://external-repl-prop/io/output2", false).numberOfEntries > 4) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/output2").getPublishers().size(), 1);
        String prometheusMetrics = TestPulsarFunctionUtils.getPrometheusMetrics(((Integer) this.pulsar.getListenPortHTTP().get()).intValue());
        log.info("prometheusMetrics: {}", prometheusMetrics);
        Map<String, TestPulsarFunctionUtils.Metric> parseMetrics = TestPulsarFunctionUtils.parseMetrics(prometheusMetrics);
        TestPulsarFunctionUtils.Metric metric = parseMetrics.get("pulsar_source_received_total");
        Assert.assertEquals(metric.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric.tags.get("instance_id"), "0");
        Assert.assertEquals(metric.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric.value > 0.0d);
        TestPulsarFunctionUtils.Metric metric2 = parseMetrics.get("pulsar_source_received_1min_total");
        Assert.assertEquals(metric2.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric2.tags.get("instance_id"), "0");
        Assert.assertEquals(metric2.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric2.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric2.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric2.value > 0.0d);
        TestPulsarFunctionUtils.Metric metric3 = parseMetrics.get("pulsar_source_written_total");
        Assert.assertEquals(metric3.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric3.tags.get("instance_id"), "0");
        Assert.assertEquals(metric3.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric3.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric3.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric3.value > 0.0d);
        TestPulsarFunctionUtils.Metric metric4 = parseMetrics.get("pulsar_source_written_1min_total");
        Assert.assertEquals(metric4.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric4.tags.get("instance_id"), "0");
        Assert.assertEquals(metric4.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric4.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric4.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric4.value > 0.0d);
        TestPulsarFunctionUtils.Metric metric5 = parseMetrics.get("pulsar_source_source_exceptions_total");
        Assert.assertEquals(metric5.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric5.tags.get("instance_id"), "0");
        Assert.assertEquals(metric5.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric5.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric5.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(metric5.value, 0.0d);
        TestPulsarFunctionUtils.Metric metric6 = parseMetrics.get("pulsar_source_source_exceptions_1min_total");
        Assert.assertEquals(metric6.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric6.tags.get("instance_id"), "0");
        Assert.assertEquals(metric6.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric6.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric6.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(metric6.value, 0.0d);
        TestPulsarFunctionUtils.Metric metric7 = parseMetrics.get("pulsar_source_system_exceptions_total");
        Assert.assertEquals(metric7.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric7.tags.get("instance_id"), "0");
        Assert.assertEquals(metric7.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric7.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric7.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(metric7.value, 0.0d);
        TestPulsarFunctionUtils.Metric metric8 = parseMetrics.get("pulsar_source_system_exceptions_1min_total");
        Assert.assertEquals(metric8.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric8.tags.get("instance_id"), "0");
        Assert.assertEquals(metric8.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric8.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric8.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertEquals(metric8.value, 0.0d);
        TestPulsarFunctionUtils.Metric metric9 = parseMetrics.get("pulsar_source_last_invocation");
        Assert.assertEquals(metric9.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric9.tags.get("instance_id"), "0");
        Assert.assertEquals(metric9.tags.get("name"), "PulsarSource-test");
        Assert.assertEquals(metric9.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric9.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSource-test"));
        Assert.assertTrue(metric9.value > 0.0d);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
        this.admin.sources().deleteSource("external-repl-prop", "io", "PulsarSource-test");
    }

    @Test(timeOut = 20000, groups = {"builtin"})
    public void testPulsarSourceStatsBuiltin() throws Exception {
        testPulsarSourceStats(String.format("%s://data-generator", "builtin"));
    }

    @Test(timeOut = 20000, groups = {"builtin"}, expectedExceptions = {PulsarAdminException.class}, expectedExceptionsMessageRegExp = "Built-in source is not available")
    public void testPulsarSourceStatsBuiltinDoesNotExist() throws Exception {
        testPulsarSourceStats(String.format("%s://foo", "builtin"));
    }

    @Test(timeOut = 20000)
    public void testPulsarSourceStatsWithFile() throws Exception {
        testPulsarSourceStats(PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString());
    }

    @Test(timeOut = 40000)
    public void testPulsarSourceStatsWithUrl() throws Exception {
        testPulsarSourceStats(this.fileServer.getUrl("/pulsar-io-data-generator.nar"));
    }

    private static SourceConfig createSourceConfig(String str, String str2, String str3, String str4) {
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(str);
        sourceConfig.setNamespace(str2);
        sourceConfig.setName(str3);
        sourceConfig.setParallelism(1);
        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sourceConfig.setTopicName(str4);
        return sourceConfig;
    }
}
