package org.apache.beam.io.debezium;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.io.debezium.DebeziumReadSchemaTransformProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.kafka.connect.errors.ConnectException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.class */
public class DebeziumReadSchemaTransformTest {

    @ClassRule
    public static final PostgreSQLContainer<?> POSTGRES_SQL_CONTAINER = new PostgreSQLContainer(DockerImageName.parse("debezium/example-postgres:latest").asCompatibleSubstituteFor("postgres")).withPassword("dbz").withUsername("debezium").withExposedPorts(new Integer[]{5432}).withDatabaseName("inventory");

    @ClassRule
    public static final MySQLContainer<?> MY_SQL_CONTAINER = new MySQLContainer(DockerImageName.parse("debezium/example-mysql:1.4").asCompatibleSubstituteFor("mysql")).withPassword("debezium").withUsername("mysqluser").withExposedPorts(new Integer[]{3306}).waitingFor(new HttpWaitStrategy().forPort(3306).forStatusCodeMatching(num -> {
        return num.intValue() == 200;
    }).withStartupTimeout(Duration.ofMinutes(2)));

    @Parameterized.Parameter(0)
    public Container<?> databaseContainer;

    @Parameterized.Parameter(1)
    public String userName;

    @Parameterized.Parameter(2)
    public String password;

    @Parameterized.Parameter(3)
    public String database;

    @Parameterized.Parameter(4)
    public Integer port;

    @Parameterized.Parameters
    public static Iterable<Object[]> data() {
        return Arrays.asList(new Object[]{POSTGRES_SQL_CONTAINER, "debezium", "dbz", "POSTGRES", 5432}, new Object[]{MY_SQL_CONTAINER, "debezium", "dbz", "MYSQL", 3306});
    }

    private PTransform<PCollectionRowTuple, PCollectionRowTuple> makePtransform(String str, String str2, String str3, Integer num, String str4) {
        return new DebeziumReadSchemaTransformProvider(true, 10, 100L).from(DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration.builder().setDatabase(str3).setPassword(str2).setUsername(str).setHost(str4).setTable("inventory.customers").setPort(num).build()).buildTransform();
    }

    @Test
    public void testNoProblem() {
        MatcherAssert.assertThat((List) PCollectionRowTuple.empty(Pipeline.create()).apply(makePtransform(this.userName, this.password, this.database, this.databaseContainer.getMappedPort(this.port.intValue()), "localhost")).get("output").getSchema().getFields().stream().map(field -> {
            return field.getName();
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new String[]{"before", "after", "source", "op", "ts_ms", "transaction"}));
    }

    @Test
    public void testWrongUser() {
        Pipeline create = Pipeline.create();
        ConnectException assertThrows = Assert.assertThrows(ConnectException.class, () -> {
            PCollectionRowTuple.empty(create).apply(makePtransform("wrongUser", this.password, this.database, this.databaseContainer.getMappedPort(this.port.intValue()), "localhost")).get("output");
        });
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.containsString("password"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.containsString("wrongUser"));
    }

    @Test
    public void testWrongPassword() {
        Pipeline create = Pipeline.create();
        ConnectException assertThrows = Assert.assertThrows(ConnectException.class, () -> {
            PCollectionRowTuple.empty(create).apply(makePtransform(this.userName, "wrongPassword", this.database, this.databaseContainer.getMappedPort(this.port.intValue()), "localhost")).get("output");
        });
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.containsString("password"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.containsString(this.userName));
    }

    @Test
    public void testWrongPort() {
        Pipeline create = Pipeline.create();
        Throwable cause = Assert.assertThrows(ConnectException.class, () -> {
            PCollectionRowTuple.empty(create).apply(makePtransform(this.userName, this.password, this.database, 12345, "localhost")).get("output");
        }).getCause();
        while (true) {
            Throwable th = cause;
            if (th.getCause() == null) {
                MatcherAssert.assertThat(th.getMessage(), Matchers.containsString("Connection refused"));
                return;
            }
            cause = th.getCause();
        }
    }

    @Test
    public void testWrongHost() throws Exception {
        Pipeline create = Pipeline.create();
        Throwable cause = ((Exception) Assert.assertThrows(Exception.class, () -> {
            PCollectionRowTuple.empty(create).apply(makePtransform(this.userName, this.password, this.database, this.databaseContainer.getMappedPort(this.port.intValue()), "23.128.129.130")).get("output");
        })).getCause();
        while (true) {
            Throwable th = cause;
            if (th.getCause() == null) {
                MatcherAssert.assertThat(th.getMessage(), Matchers.containsString("Connection refused"));
                return;
            }
            cause = th.getCause();
        }
    }
}
