package org.apache.hudi.utilities.sources;

import java.io.Closeable;
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.SqlQueryBuilder;
import org.apache.hudi.utilities.config.JdbcSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/JdbcSource.class */
public class JdbcSource extends RowSource {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class);
    private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
    private static final String URI_JDBC_PREFIX = "jdbc:";

    /* loaded from: input_file:org/apache/hudi/utilities/sources/JdbcSource$Config.class */
    protected static class Config {
        private static final String URL_PROP = "url";
        private static final String USER_PROP = "user";
        private static final String PASSWORD_PROP = "password";
        private static final String DRIVER_PROP = "driver";
        private static final String RDBMS_TABLE_PROP = "dbtable";

        protected Config() {
        }
    }

    public JdbcSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
    }

    private static DataFrameReader validatePropsAndGetDataFrameReader(SparkSession sparkSession, TypedProperties typedProperties) throws HoodieException {
        DataFrameReader option;
        InputStream inputStream = null;
        try {
            try {
                DataFrameReader option2 = sparkSession.read().format("jdbc").option("url", ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.URL)).option("user", ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.USER)).option("driver", ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.DRIVER_CLASS)).option("dbtable", ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.RDBMS_TABLE_NAME));
                if (ConfigUtils.containsConfigProperty(typedProperties, JdbcSourceConfig.PASSWORD)) {
                    LOG.info("Reading JDBC password from properties file....");
                    option = option2.option("password", ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.PASSWORD));
                } else {
                    if (!ConfigUtils.containsConfigProperty(typedProperties, JdbcSourceConfig.PASSWORD_FILE) || StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.PASSWORD_FILE))) {
                        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS datasource", JdbcSourceConfig.PASSWORD_FILE.key(), JdbcSourceConfig.PASSWORD.key()));
                    }
                    LOG.info(String.format("Reading JDBC password from password file %s", ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.PASSWORD_FILE)));
                    inputStream = FileSystem.get(sparkSession.sparkContext().hadoopConfiguration()).open(new Path(ConfigUtils.getStringWithAltKeys(typedProperties, JdbcSourceConfig.PASSWORD_FILE)));
                    byte[] bArr = new byte[inputStream.available()];
                    inputStream.read(bArr);
                    option = option2.option("password", new String(bArr));
                }
                addExtraJdbcOptions(typedProperties, option);
                if (ConfigUtils.getBooleanWithAltKeys(typedProperties, JdbcSourceConfig.IS_INCREMENTAL)) {
                    ConfigUtils.checkRequiredConfigProperties(typedProperties, Collections.singletonList(JdbcSourceConfig.INCREMENTAL_COLUMN));
                }
                inputStream = inputStream;
                return option;
            } catch (Exception e) {
                throw new HoodieException("Failed to validate properties", e);
            }
        } finally {
            IOUtils.closeStream((Closeable) null);
        }
    }

    private static void addExtraJdbcOptions(TypedProperties typedProperties, DataFrameReader dataFrameReader) {
        Iterator it = typedProperties.keySet().iterator();
        while (it.hasNext()) {
            String obj = it.next().toString();
            Option stripPrefix = ConfigUtils.stripPrefix(obj, JdbcSourceConfig.EXTRA_OPTIONS);
            if (stripPrefix.isPresent()) {
                String str = (String) stripPrefix.get();
                String string = typedProperties.getString(obj);
                if (!StringUtils.isNullOrEmpty(string)) {
                    LOG.info(String.format("Adding %s -> %s to jdbc options", str, string));
                    dataFrameReader.option(str, string);
                }
            }
        }
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    protected Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> option, long j) throws HoodieException {
        try {
            ConfigUtils.checkRequiredConfigProperties(this.props, Arrays.asList(JdbcSourceConfig.URL, JdbcSourceConfig.DRIVER_CLASS, JdbcSourceConfig.USER, JdbcSourceConfig.RDBMS_TABLE_NAME, JdbcSourceConfig.IS_INCREMENTAL));
            return fetch(option, j);
        } catch (HoodieException e) {
            LOG.error("Exception while running JDBCSource ", e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Exception while running JDBCSource ", e2);
            throw new HoodieException("Error fetching next batch from JDBC source. Last checkpoint: " + option.orElse((Object) null), e2);
        }
    }

    private Pair<Option<Dataset<Row>>, Checkpoint> fetch(Option<Checkpoint> option, long j) {
        Dataset<Row> fullFetch;
        if (!option.isPresent() || StringUtils.isNullOrEmpty(((Checkpoint) option.get()).getCheckpointKey())) {
            LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
            fullFetch = fullFetch(j);
        } else {
            fullFetch = incrementalFetch(option, j);
        }
        fullFetch.persist(StorageLevel.fromString(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.STORAGE_LEVEL, "MEMORY_AND_DISK_SER")));
        Pair<Option<Dataset<Row>>, Checkpoint> of = Pair.of(Option.of(fullFetch), checkpoint(fullFetch, ConfigUtils.getBooleanWithAltKeys(this.props, JdbcSourceConfig.IS_INCREMENTAL), option));
        fullFetch.unpersist();
        return of;
    }

    private Dataset<Row> incrementalFetch(Option<Checkpoint> option, long j) {
        try {
            SqlQueryBuilder where = SqlQueryBuilder.select("*").from(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.RDBMS_TABLE_NAME)).where(String.format(" %s > '%s'", ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN), ((Checkpoint) option.get()).getCheckpointKey()));
            if (j > 0 && DB_LIMIT_CLAUSE.contains(URI.create(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.URL).substring(URI_JDBC_PREFIX.length())).getScheme())) {
                where.orderBy(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN)).limit(j);
            }
            String format = String.format("(%s) rdbms_table", where.toString());
            LOG.info("PPD QUERY: " + format);
            LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", format));
            return validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", format).load();
        } catch (Exception e) {
            LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e);
            if (!ConfigUtils.containsConfigProperty(this.props, JdbcSourceConfig.FALLBACK_TO_FULL_FETCH) || !ConfigUtils.getBooleanWithAltKeys(this.props, JdbcSourceConfig.FALLBACK_TO_FULL_FETCH)) {
                throw e;
            }
            LOG.warn("Falling back to full scan.");
            return fullFetch(j);
        }
    }

    private Dataset<Row> fullFetch(long j) {
        SqlQueryBuilder from = SqlQueryBuilder.select("*").from(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.RDBMS_TABLE_NAME));
        if (j > 0 && DB_LIMIT_CLAUSE.contains(URI.create(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.URL).substring(URI_JDBC_PREFIX.length())).getScheme())) {
            if (ConfigUtils.containsConfigProperty(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN)) {
                from.orderBy(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN)).limit(j);
            } else {
                from.limit(j);
            }
        }
        return validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", String.format("(%s) rdbms_table", from.toString())).load();
    }

    private Checkpoint checkpoint(Dataset<Row> dataset, boolean z, Option<Checkpoint> option) {
        try {
            if (!z) {
                return new StreamerCheckpointV2("");
            }
            Column col = dataset.col(ConfigUtils.getStringWithAltKeys(this.props, JdbcSourceConfig.INCREMENTAL_COLUMN));
            String string = ((Row) dataset.agg(functions.max(col).cast(DataTypes.StringType), new Column[0]).first()).getString(0);
            LOG.info(String.format("Checkpointing column %s with value: %s ", col, string));
            return string != null ? new StreamerCheckpointV2(string) : (!option.isPresent() || StringUtils.isNullOrEmpty(((Checkpoint) option.get()).getCheckpointKey())) ? new StreamerCheckpointV2("") : (Checkpoint) option.get();
        } catch (Exception e) {
            LOG.error("Failed to checkpoint");
            throw new HoodieReadFromSourceException("Failed to checkpoint. Last checkpoint: " + option.orElse((Object) null), e);
        }
    }
}
