package org.apache.hudi.utilities.sources;

import java.util.Collections;
import java.util.List;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.class */
public class S3EventsHoodieIncrSource extends HoodieIncrSource {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) S3EventsHoodieIncrSource.class);
    private static final String EMPTY_STRING = "";
    private final String srcPath;
    private final int numInstantsPerFetch;
    private final boolean checkIfFileExists;
    private final String fileFormat;
    private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
    private final QueryRunner queryRunner;
    private final CloudDataFetcher cloudDataFetcher;
    private final Option<SchemaProvider> schemaProvider;
    private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
    public static final String S3_OBJECT_KEY = "s3.object.key";
    public static final String S3_OBJECT_SIZE = "s3.object.size";
    public static final String S3_BUCKET_NAME = "s3.bucket.name";

    /* loaded from: input_file:org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource$Config.class */
    public static class Config {

        @Deprecated
        static final String ENABLE_EXISTS_CHECK = S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK.key();

        @Deprecated
        static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = (Boolean) S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK.defaultValue();

        @Deprecated
        static final String S3_KEY_PREFIX = S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key();

        @Deprecated
        static final String S3_FS_PREFIX = S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key();

        @Deprecated
        static final String S3_IGNORE_KEY_PREFIX = S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key();

        @Deprecated
        static final String S3_IGNORE_KEY_SUBSTRING = S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key();

        @Deprecated
        public static final String SPARK_DATASOURCE_OPTIONS = S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key();
    }

    public S3EventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        this(typedProperties, javaSparkContext, sparkSession, schemaProvider, new QueryRunner(sparkSession, typedProperties), new CloudDataFetcher(typedProperties, ConfigUtils.getStringWithAltKeys(typedProperties, CloudSourceConfig.DATAFILE_FORMAT, true)));
    }

    public S3EventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, QueryRunner queryRunner, CloudDataFetcher cloudDataFetcher) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties(typedProperties, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.srcPath = ConfigUtils.getStringWithAltKeys(typedProperties, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
        this.numInstantsPerFetch = ConfigUtils.getIntWithAltKeys(typedProperties, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
        this.checkIfFileExists = ConfigUtils.getBooleanWithAltKeys(typedProperties, CloudSourceConfig.ENABLE_EXISTS_CHECK);
        this.fileFormat = StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(typedProperties, CloudSourceConfig.DATAFILE_FORMAT, "")) ? ConfigUtils.getStringWithAltKeys(typedProperties, HoodieIncrSourceConfig.SOURCE_FILE_FORMAT, true) : ConfigUtils.getStringWithAltKeys(typedProperties, CloudSourceConfig.DATAFILE_FORMAT, "");
        this.missingCheckpointStrategy = IncrSourceHelper.getMissingCheckpointStrategy(typedProperties);
        this.queryRunner = queryRunner;
        this.cloudDataFetcher = cloudDataFetcher;
        this.schemaProvider = Option.ofNullable(schemaProvider);
        this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(typedProperties);
    }

    @Override // org.apache.hudi.utilities.sources.HoodieIncrSource, org.apache.hudi.utilities.sources.RowSource
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        CloudObjectIncrCheckpoint fromString = CloudObjectIncrCheckpoint.fromString(option);
        QueryInfo generateQueryInfo = IncrSourceHelper.generateQueryInfo(this.sparkContext, this.srcPath, this.numInstantsPerFetch, Option.of(fromString.getCommit()), this.missingCheckpointStrategy, IncrSourceHelper.getHollowCommitHandleMode(this.props), HoodieRecord.COMMIT_TIME_METADATA_FIELD, S3_OBJECT_KEY, S3_OBJECT_SIZE, true, Option.ofNullable(fromString.getKey()));
        LOG.info("Querying S3 with:" + fromString + ", queryInfo:" + generateQueryInfo);
        if (StringUtils.isNullOrEmpty(fromString.getKey()) && generateQueryInfo.areStartAndEndInstantsEqual()) {
            LOG.warn("Already caught up. No new data to process");
            return Pair.of(Option.empty(), generateQueryInfo.getEndInstant());
        }
        Pair<QueryInfo, Dataset<Row>> run = this.queryRunner.run(generateQueryInfo, this.snapshotLoadQuerySplitter);
        QueryInfo queryInfo = (QueryInfo) run.getLeft();
        Dataset<Row> applyFilter = applyFilter((Dataset) run.getRight(), this.fileFormat);
        LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + j);
        Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(applyFilter, j, queryInfo, fromString);
        if (!((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).isPresent()) {
            LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant());
            return Pair.of(Option.empty(), queryInfo.getEndInstant());
        }
        LOG.info("Adjusted end checkpoint :" + filterAndGenerateCheckpointBasedOnSourceLimit.getLeft());
        List<CloudObjectMetadata> collectAsList = ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).select(S3_BUCKET_NAME, new String[]{S3_OBJECT_KEY, S3_OBJECT_SIZE}).distinct().mapPartitions(CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition(ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX, true).toLowerCase() + "://", new SerializableConfiguration(this.sparkContext.hadoopConfiguration()), this.checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class)).collectAsList();
        LOG.info("Total number of files to process :" + collectAsList.size());
        return Pair.of(this.cloudDataFetcher.getCloudObjectDataDF(this.sparkSession, collectAsList, this.props, this.schemaProvider), ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getLeft()).toString());
    }

    Dataset<Row> applyFilter(Dataset<Row> dataset, String str) {
        String str2;
        str2 = "s3.object.size > 0";
        str2 = StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX, true)) ? "s3.object.size > 0" : str2 + " and " + S3_OBJECT_KEY + " like '" + ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX) + "%'";
        if (!StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX, true))) {
            str2 = str2 + " and " + S3_OBJECT_KEY + " not like '" + ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX) + "%'";
        }
        if (!StringUtils.isNullOrEmpty(ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING, true))) {
            str2 = str2 + " and " + S3_OBJECT_KEY + " not like '%" + ConfigUtils.getStringWithAltKeys(this.props, S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING) + "%'";
        }
        return dataset.filter(str2 + " and " + S3_OBJECT_KEY + " like '%" + str + "%'");
    }
}
