/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.io.debezium;

import io.debezium.connector.mysql.MySqlConnector;
import java.io.Serializable;
import java.time.Duration;
import org.apache.beam.io.debezium.DebeziumIO;
import org.apache.beam.io.debezium.SourceRecordJson;
import org.apache.beam.io.debezium.SourceRecordMapper;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=JUnit4.class)
public class DebeziumIOMySqlConnectorIT {
    @ClassRule
    public static final MySQLContainer<?> MY_SQL_CONTAINER = (MySQLContainer)((MySQLContainer)new MySQLContainer(DockerImageName.parse((String)"debezium/example-mysql:1.4").asCompatibleSubstituteFor("mysql")).withPassword("debezium").withUsername("mysqluser").withExposedPorts(new Integer[]{3306})).waitingFor(new HttpWaitStrategy().forPort(3306).forStatusCodeMatching(response -> response == 200).withStartupTimeout(Duration.ofMinutes(2L)));

    @Test
    public void testDebeziumIOMySql() {
        MY_SQL_CONTAINER.start();
        String host = MY_SQL_CONTAINER.getContainerIpAddress();
        String port = MY_SQL_CONTAINER.getMappedPort(3306).toString();
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create((PipelineOptions)options);
        PCollection results = (PCollection)p.apply((PTransform)DebeziumIO.read().withConnectorConfiguration(DebeziumIO.ConnectorConfiguration.create().withUsername("debezium").withPassword("dbz").withConnectorClass(MySqlConnector.class).withHostName(host).withPort(port).withConnectionProperty("database.server.id", "184054").withConnectionProperty("database.server.name", "dbserver1").withConnectionProperty("database.include.list", "inventory").withConnectionProperty("include.schema.changes", "false")).withFormatFunction((SourceRecordMapper)new SourceRecordJson.SourceRecordJsonMapper()).withMaxNumberOfRecords(Integer.valueOf(30)).withCoder((Coder)StringUtf8Coder.of()));
        String expected = "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\",\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\",\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,\"type\":\"SHIPPING\"}}}";
        PAssert.that((PCollection)results).satisfies((SerializableFunction & Serializable)res -> {
            MatcherAssert.assertThat((Object)res, (Matcher)SerializableMatchers.hasItem((Serializable)((Object)expected)));
            return null;
        });
        p.run().waitUntilFinish();
        MY_SQL_CONTAINER.stop();
    }
}

