/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources.debezium;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumMsSqlContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
import org.apache.pulsar.tests.integration.io.sources.debezium.DebeziumMongoDbSourceTester;
import org.apache.pulsar.tests.integration.io.sources.debezium.DebeziumMsSqlSourceTester;
import org.apache.pulsar.tests.integration.io.sources.debezium.DebeziumMySqlSourceTester;
import org.apache.pulsar.tests.integration.io.sources.debezium.DebeziumPostgreSqlSourceTester;
import org.apache.pulsar.tests.integration.io.sources.debezium.PulsarIODebeziumSourceRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.Sets;
import org.testng.annotations.Test;

public class PulsarDebeziumSourcesTest
extends PulsarIOTestBase {
    private static final Logger log = LoggerFactory.getLogger(PulsarDebeziumSourcesTest.class);
    protected final AtomicInteger testId = new AtomicInteger(0);

    @Test(groups={"source"})
    public void testDebeziumMySqlSourceJson() throws Exception {
        this.testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, false);
    }

    @Test(groups={"source"})
    public void testDebeziumMySqlSourceJsonWithClientBuilder() throws Exception {
        this.testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, true);
    }

    @Test(groups={"source"})
    public void testDebeziumMySqlSourceAvro() throws Exception {
        this.testDebeziumMySqlConnect("org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false, false);
    }

    @Test(groups={"source"})
    public void testDebeziumPostgreSqlSource() throws Exception {
        this.testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
    }

    @Test(groups={"source"})
    public void testDebeziumMongoDbSource() throws Exception {
        this.testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
    }

    @Test(groups={"source"})
    public void testDebeziumMsSqlSource() throws Exception {
        this.testDebeziumMsSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope, boolean testWithClientBuilder) throws Exception {
        String tenant = "public";
        String namespace = "default";
        String outputTopicName = "debe-output-topic-name-" + this.testId.getAndIncrement();
        boolean isJsonConverter = converterClassName.endsWith("JsonConverter");
        String consumeTopicName = "debezium/mysql-" + (isJsonConverter ? "json" : "avro") + "/dbserver1.inventory.products";
        String sourceName = "test-source-debezium-mysql" + (isJsonConverter ? "json" : "avro") + "-" + (Object)((Object)this.functionRuntimeType) + "-" + PulsarDebeziumSourcesTest.randomName(8);
        int numMessages = 47;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
            try {
                this.initNamespace(admin);
                try {
                    SchemaInfo lastSchemaInfo = admin.schemas().getSchemaInfo(consumeTopicName);
                    log.info("lastSchemaInfo: {}", (Object)(lastSchemaInfo == null ? "null" : lastSchemaInfo.toString()));
                }
                catch (Exception e) {
                    log.warn("failed to get schemaInfo for topic: {}, exceptions message: {}", (Object)consumeTopicName, (Object)e.getMessage());
                }
                admin.topics().createNonPartitionedTopic(outputTopicName);
                DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(this.pulsarCluster, converterClassName, testWithClientBuilder);
                try {
                    sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
                    DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(this.pulsarCluster.getClusterName());
                    sourceTester.setServiceContainer(mySQLContainer);
                    PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(this.pulsarCluster, this.functionRuntimeType.toString(), converterClassName, "public", "default", sourceName, outputTopicName, 47, jsonWithEnvelope, consumeTopicName, client);
                    runner.testSource(sourceTester);
                }
                finally {
                    if (Collections.singletonList(sourceTester).get(0) != null) {
                        sourceTester.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
        String tenant = "public";
        String namespace = "default";
        String outputTopicName = "debe-output-topic-name-" + this.testId.getAndIncrement();
        String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
        String sourceName = "test-source-debezium-postgersql-" + (Object)((Object)this.functionRuntimeType) + "-" + PulsarDebeziumSourcesTest.randomName(8);
        int numMessages = 26;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
            try {
                this.initNamespace(admin);
                admin.topics().createNonPartitionedTopic("debezium/postgresql/dbserver1.inventory.products");
                admin.topics().createNonPartitionedTopic(outputTopicName);
                DebeziumPostgreSqlSourceTester sourceTester = new DebeziumPostgreSqlSourceTester(this.pulsarCluster);
                try {
                    sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
                    DebeziumPostgreSqlContainer postgreSqlContainer = new DebeziumPostgreSqlContainer(this.pulsarCluster.getClusterName());
                    sourceTester.setServiceContainer(postgreSqlContainer);
                    PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(this.pulsarCluster, this.functionRuntimeType.toString(), converterClassName, "public", "default", sourceName, outputTopicName, 26, jsonWithEnvelope, "debezium/postgresql/dbserver1.inventory.products", client);
                    runner.testSource(sourceTester);
                }
                finally {
                    if (Collections.singletonList(sourceTester).get(0) != null) {
                        sourceTester.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
        String tenant = "public";
        String namespace = "default";
        String outputTopicName = "debe-output-topic-name";
        String consumeTopicName = "debezium/mongodb/dbserver1.inventory.products";
        String sourceName = "test-source-connector-" + (Object)((Object)this.functionRuntimeType) + "-name-" + PulsarDebeziumSourcesTest.randomName(8);
        int numMessages = 17;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
            try {
                this.initNamespace(admin);
                admin.topics().createNonPartitionedTopic("debezium/mongodb/dbserver1.inventory.products");
                admin.topics().createNonPartitionedTopic("debe-output-topic-name");
                DebeziumMongoDbSourceTester sourceTester = new DebeziumMongoDbSourceTester(this.pulsarCluster);
                try {
                    sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
                    DebeziumMongoDbContainer mongoDbContainer = new DebeziumMongoDbContainer(this.pulsarCluster.getClusterName());
                    sourceTester.setServiceContainer(mongoDbContainer);
                    PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(this.pulsarCluster, this.functionRuntimeType.toString(), converterClassName, "public", "default", sourceName, "debe-output-topic-name", 17, jsonWithEnvelope, "debezium/mongodb/dbserver1.inventory.products", client);
                    runner.testSource(sourceTester);
                }
                finally {
                    if (Collections.singletonList(sourceTester).get(0) != null) {
                        sourceTester.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDebeziumMsSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
        String tenant = "public";
        String namespace = "default";
        String outputTopicName = "debe-output-topic-name-" + this.testId.getAndIncrement();
        String consumeTopicName = "debezium/mssql/mssql.dbo.customers";
        String sourceName = "test-source-debezium-mssql-" + (Object)((Object)this.functionRuntimeType) + "-" + PulsarDebeziumSourcesTest.randomName(8);
        boolean numMessages = true;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
            try {
                this.initNamespace(admin);
                admin.topics().createNonPartitionedTopic("debezium/mssql/mssql.dbo.customers");
                admin.topics().createNonPartitionedTopic(outputTopicName);
                DebeziumMsSqlSourceTester sourceTester = new DebeziumMsSqlSourceTester(this.pulsarCluster);
                try {
                    sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
                    DebeziumMsSqlContainer msSqlContainer = new DebeziumMsSqlContainer(this.pulsarCluster.getClusterName());
                    sourceTester.setServiceContainer(msSqlContainer);
                    PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(this.pulsarCluster, this.functionRuntimeType.toString(), converterClassName, "public", "default", sourceName, outputTopicName, 1, jsonWithEnvelope, "debezium/mssql/mssql.dbo.customers", client);
                    runner.testSource(sourceTester);
                }
                finally {
                    if (Collections.singletonList(sourceTester).get(0) != null) {
                        sourceTester.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    protected void initNamespace(PulsarAdmin admin) {
        log.info("[initNamespace] start.");
        try {
            admin.tenants().createTenant("debezium", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet(), (Set)Sets.newHashSet((Object[])new String[]{this.pulsarCluster.getClusterName()})));
            String[] namespaces = new String[]{"debezium/mysql-json", "debezium/mysql-avro", "debezium/mongodb", "debezium/postgresql", "debezium/mssql"};
            Policies policies = new Policies();
            policies.retention_policies = new RetentionPolicies(-1, 50);
            for (String ns : namespaces) {
                admin.namespaces().createNamespace(ns, policies);
            }
        }
        catch (Exception e) {
            log.info("[initNamespace] msg: {}", (Object)e.getMessage());
        }
        log.info("[initNamespace] finish.");
    }
}

