package org.apache.hugegraph.loader.flink;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.http.client.utils.URIBuilder;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.mapping.LoadMapping;
import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
import org.apache.hugegraph.loader.util.Printer;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.class */
public class HugeGraphFlinkCDCLoader {
    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
    private static final String JDBC_PREFIX = "jdbc:";
    private final LoadOptions loadOptions;
    private final String[] options;

    public HugeGraphFlinkCDCLoader(String[] strArr) {
        this.options = strArr;
        this.loadOptions = LoadOptions.parseOptions(strArr);
    }

    public static void main(String[] strArr) {
        try {
            new HugeGraphFlinkCDCLoader(strArr).load();
        } catch (Throwable th) {
            Printer.printError("Failed to start loading", th);
        }
    }

    public void load() {
        List<InputStruct> structs = LoadMapping.of(this.loadOptions.file).structs();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        for (InputStruct inputStruct : structs) {
            executionEnvironment.fromSource(buildMysqlSource(inputStruct), WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new HugeGraphSinkFunction(new HugeGraphOutputFormat(inputStruct, this.options))).setParallelism(this.loadOptions.sinkParallelism);
        }
        executionEnvironment.enableCheckpointing(3000L);
        try {
            executionEnvironment.execute("flink-cdc-hugegraph");
        } catch (Exception e) {
            throw new LoadException("Failed to execute flink", e);
        }
    }

    private MySqlSource<String> buildMysqlSource(InputStruct inputStruct) {
        JDBCSource jDBCSource = (JDBCSource) inputStruct.input();
        String url = jDBCSource.url();
        try {
            URIBuilder uRIBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
            return MySqlSource.builder().hostname(uRIBuilder.getHost()).port(uRIBuilder.getPort()).databaseList(new String[]{jDBCSource.database()}).tableList(new String[]{jDBCSource.database() + Constants.DOT_STR + jDBCSource.table()}).username(jDBCSource.username()).password(jDBCSource.password()).deserializer(new HugeGraphDeserialization()).build();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException(String.format("Failed to parse url(%s) to get hostName and port", url), e);
        }
    }
}
