package org.apache.hudi.utilities.sources.helpers;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/QueryRunner.class */
public class QueryRunner {
    private final SparkSession sparkSession;
    private final TypedProperties props;
    private final String sourcePath;
    private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);

    public QueryRunner(SparkSession sparkSession, TypedProperties typedProperties) {
        this.sparkSession = sparkSession;
        this.props = typedProperties;
        ConfigUtils.checkRequiredConfigProperties(typedProperties, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.sourcePath = ConfigUtils.getStringWithAltKeys((Properties) typedProperties, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
    }

    public Pair<QueryInfo, Dataset<Row>> run(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> option) {
        if (queryInfo.isIncremental()) {
            return runIncrementalQuery(queryInfo);
        }
        if (queryInfo.isSnapshot()) {
            return runSnapshotQuery(queryInfo, option);
        }
        throw new HoodieException("Unknown query type " + queryInfo.getQueryType());
    }

    public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> list) {
        if (list == null || list.isEmpty()) {
            return dataset;
        }
        LOG.debug("Applying ordering " + list);
        return dataset.orderBy((Column[]) list.stream().map(functions::col).toArray(i -> {
            return new Column[i];
        }));
    }

    public Pair<QueryInfo, Dataset<Row>> runIncrementalQuery(QueryInfo queryInfo) {
        LOG.info("Running incremental query");
        return Pair.of(queryInfo, this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getStartInstant()).option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()).option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), this.props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), (String) DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())).load(this.sourcePath));
    }

    public Pair<QueryInfo, Dataset<Row>> runSnapshotQuery(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> option) {
        LOG.info("Running snapshot query");
        Dataset<Row> load = this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(this.sourcePath);
        QueryInfo queryInfo2 = (QueryInfo) option.map(snapshotLoadQuerySplitter -> {
            return snapshotLoadQuerySplitter.getNextCheckpoint(load, queryInfo, Option.empty());
        }).orElse(queryInfo);
        return Pair.of(queryInfo2, applySnapshotQueryFilters(load, queryInfo2));
    }

    public Dataset<Row> applySnapshotQueryFilters(Dataset<Row> dataset, QueryInfo queryInfo) {
        Dataset filter = dataset.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getEndInstant()));
        Option<String> predicateFilter = queryInfo.getPredicateFilter();
        filter.getClass();
        return (Dataset) predicateFilter.map(filter::filter).orElse(filter);
    }
}
