package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
import java.util.Scanner;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.SqlFileBasedSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/SqlFileBasedSource.class */
public class SqlFileBasedSource extends RowSource {
    private static final Logger LOG = LoggerFactory.getLogger(SqlFileBasedSource.class);
    private final String sourceSqlFile;
    private final boolean shouldEmitCheckPoint;

    public SqlFileBasedSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties(typedProperties, Collections.singletonList(SqlFileBasedSourceConfig.SOURCE_SQL_FILE));
        this.sourceSqlFile = ConfigUtils.getStringWithAltKeys((Properties) typedProperties, SqlFileBasedSourceConfig.SOURCE_SQL_FILE);
        this.shouldEmitCheckPoint = ConfigUtils.getBooleanWithAltKeys(typedProperties, SqlFileBasedSourceConfig.EMIT_EPOCH_CHECKPOINT);
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        Dataset dataset = null;
        try {
            Scanner scanner = new Scanner((InputStream) FSUtils.getFs(this.sourceSqlFile, this.sparkContext.hadoopConfiguration(), true).open(new Path(this.sourceSqlFile)));
            scanner.useDelimiter(";");
            while (scanner.hasNext()) {
                String trim = scanner.next().trim();
                if (!trim.isEmpty()) {
                    LOG.info(trim);
                    dataset = this.sparkSession.sql(trim);
                }
            }
            return Pair.of(Option.of(dataset), this.shouldEmitCheckPoint ? String.valueOf(System.currentTimeMillis()) : null);
        } catch (IOException e) {
            throw new HoodieIOException("Error reading source SQL file.", e);
        }
    }
}
