package org.apache.beam.io.debezium;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.debezium.connector.mysql.MySqlConnector;
import java.lang.invoke.SerializedLambda;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import javax.sql.DataSource;
import org.apache.beam.io.debezium.DebeziumIO;
import org.apache.beam.io.debezium.DebeziumReadSchemaTransformProvider;
import org.apache.beam.io.debezium.SourceRecordJson;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.class */
public class DebeziumIOMySqlConnectorIT {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumIOMySqlConnectorIT.class);

    @ClassRule
    public static final MySQLContainer<?> MY_SQL_CONTAINER = new MySQLContainer(DockerImageName.parse("debezium/example-mysql:1.4").asCompatibleSubstituteFor("mysql")).withPassword("debezium").withUsername("mysqluser").withExposedPorts(new Integer[]{3306}).waitingFor(new HttpWaitStrategy().forPort(3306).forStatusCodeMatching(num -> {
        return num.intValue() == 200;
    }).withStartupTimeout(Duration.ofMinutes(2)));

    public static DataSource getMysqlDatasource(Void r4) {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(MY_SQL_CONTAINER.getJdbcUrl());
        hikariConfig.setUsername(MY_SQL_CONTAINER.getUsername());
        hikariConfig.setPassword(MY_SQL_CONTAINER.getPassword());
        hikariConfig.setDriverClassName(MY_SQL_CONTAINER.getDriverClassName());
        return new HikariDataSource(hikariConfig);
    }

    private void monitorEssentialMetrics() {
        try {
            Statement createStatement = getMysqlDatasource(null).getConnection().createStatement();
            while (true) {
                ResultSet executeQuery = createStatement.executeQuery("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
                if (!executeQuery.next()) {
                    break;
                }
                LOG.info("Open connections: {}", Long.valueOf(executeQuery.getLong(2)));
                executeQuery.close();
                Thread.sleep(4000L);
            }
            throw new IllegalArgumentException("OIOI");
        } catch (InterruptedException | SQLException e) {
            throw new IllegalArgumentException("Oi", e);
        }
    }

    @Test
    public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException {
        long j = 500;
        MY_SQL_CONTAINER.start();
        PipelineOptions create = PipelineOptionsFactory.create();
        Pipeline create2 = Pipeline.create(create);
        create2.apply(GenerateSequence.from(0L).to(500L).withRate(10L, org.joda.time.Duration.standardSeconds(1L))).apply(MapElements.into(TypeDescriptors.rows()).via(l -> {
            return Row.withSchema(DebeziumIOPostgresSqlConnectorIT.TABLE_SCHEMA).withFieldValue("id", Integer.valueOf(l.longValue() <= 1000 ? Long.valueOf(l.longValue()).intValue() : Long.valueOf(l.longValue()).intValue() + 4)).withFieldValue("first_name", Long.toString(l.longValue())).withFieldValue("last_name", Long.toString(j - l.longValue())).withFieldValue("email", Long.toString(l.longValue()) + "@beamail.com").build();
        })).setRowSchema(DebeziumIOPostgresSqlConnectorIT.TABLE_SCHEMA).apply(JdbcIO.write().withTable("inventory.customers").withDataSourceProviderFn(DebeziumIOMySqlConnectorIT::getMysqlDatasource));
        Pipeline create3 = Pipeline.create(create);
        Long l2 = 500L;
        PAssert.that(PCollectionRowTuple.empty(create3).apply(new DebeziumReadSchemaTransformProvider(true, Integer.valueOf(l2.intValue() + 4), Long.valueOf(500 * 200)).from(DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration.builder().setDatabase("MYSQL").setPassword("dbz").setUsername("debezium").setHost("localhost").setTable("inventory.customers").setPort(MY_SQL_CONTAINER.getMappedPort(3306)).build()).buildTransform()).get("output")).satisfies(iterable -> {
            MatcherAssert.assertThat(Integer.valueOf(Lists.newArrayList(iterable).size()), Matchers.equalTo(Integer.valueOf(Long.valueOf(j + 4).intValue())));
            return null;
        });
        Thread thread = new Thread(() -> {
            create2.run().waitUntilFinish();
        });
        Thread thread2 = new Thread(this::monitorEssentialMetrics);
        thread2.start();
        thread.start();
        create3.run().waitUntilFinish();
        thread.join();
        thread2.interrupt();
        thread2.join();
    }

    @Test
    public void testDebeziumIOMySql() {
        MY_SQL_CONTAINER.start();
        String containerIpAddress = MY_SQL_CONTAINER.getContainerIpAddress();
        String num = MY_SQL_CONTAINER.getMappedPort(3306).toString();
        Pipeline create = Pipeline.create(PipelineOptionsFactory.create());
        PCollection apply = create.apply(DebeziumIO.read().withConnectorConfiguration(DebeziumIO.ConnectorConfiguration.create().withUsername("debezium").withPassword("dbz").withConnectorClass(MySqlConnector.class).withHostName(containerIpAddress).withPort(num).withConnectionProperty("database.server.id", "184054").withConnectionProperty("database.server.name", "dbserver1").withConnectionProperty("database.include.list", "inventory").withConnectionProperty("include.schema.changes", "false")).withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()).withMaxNumberOfRecords(30).withCoder(StringUtf8Coder.of()));
        String str = "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\",\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\",\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,\"type\":\"SHIPPING\"}}}";
        PAssert.that(apply).satisfies(iterable -> {
            MatcherAssert.assertThat(iterable, SerializableMatchers.hasItem(str));
            return null;
        });
        create.run().waitUntilFinish();
        MY_SQL_CONTAINER.stop();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1405179202:
                if (implMethodName.equals("lambda$testDebeziumIOMySql$2005ebc2$1")) {
                    z = true;
                    break;
                }
                break;
            case 518272819:
                if (implMethodName.equals("lambda$testDebeziumSchemaTransformMysqlRead$83cb6a26$1")) {
                    z = 2;
                    break;
                }
                break;
            case 518272820:
                if (implMethodName.equals("lambda$testDebeziumSchemaTransformMysqlRead$83cb6a26$2")) {
                    z = false;
                    break;
                }
                break;
            case 1017707601:
                if (implMethodName.equals("getMysqlDatasource")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT") && serializedLambda.getImplMethodSignature().equals("(JLjava/lang/Iterable;)Ljava/lang/Void;")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return iterable -> {
                        MatcherAssert.assertThat(Integer.valueOf(Lists.newArrayList(iterable).size()), Matchers.equalTo(Integer.valueOf(Long.valueOf(longValue + 4).intValue())));
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return iterable2 -> {
                        MatcherAssert.assertThat(iterable2, SerializableMatchers.hasItem(str));
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT") && serializedLambda.getImplMethodSignature().equals("(JLjava/lang/Long;)Lorg/apache/beam/sdk/values/Row;")) {
                    long longValue2 = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return l -> {
                        return Row.withSchema(DebeziumIOPostgresSqlConnectorIT.TABLE_SCHEMA).withFieldValue("id", Integer.valueOf(l.longValue() <= 1000 ? Long.valueOf(l.longValue()).intValue() : Long.valueOf(l.longValue()).intValue() + 4)).withFieldValue("first_name", Long.toString(l.longValue())).withFieldValue("last_name", Long.toString(longValue2 - l.longValue())).withFieldValue("email", Long.toString(l.longValue()) + "@beamail.com").build();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Ljavax/sql/DataSource;")) {
                    return DebeziumIOMySqlConnectorIT::getMysqlDatasource;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
