package org.apache.hudi.utilities.sources;

import java.util.Collections;
import java.util.Properties;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.class */
public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
    private final String srcPath;
    private final boolean checkIfFileExists;
    private final int numInstantsPerFetch;
    private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
    private final CloudDataFetcher cloudDataFetcher;
    private final QueryRunner queryRunner;
    private final Option<SchemaProvider> schemaProvider;
    private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
    private static final Logger LOG = LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);

    public GcsEventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics hoodieIngestionMetrics) {
        this(typedProperties, javaSparkContext, sparkSession, new CloudDataFetcher(typedProperties, javaSparkContext, sparkSession, hoodieIngestionMetrics), new QueryRunner(sparkSession, typedProperties), new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public GcsEventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, HoodieIngestionMetrics hoodieIngestionMetrics, StreamContext streamContext) {
        this(typedProperties, javaSparkContext, sparkSession, new CloudDataFetcher(typedProperties, javaSparkContext, sparkSession, hoodieIngestionMetrics), new QueryRunner(sparkSession, typedProperties), streamContext);
    }

    GcsEventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, CloudDataFetcher cloudDataFetcher, QueryRunner queryRunner, StreamContext streamContext) {
        super(typedProperties, javaSparkContext, sparkSession, streamContext);
        ConfigUtils.checkRequiredConfigProperties(typedProperties, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.srcPath = ConfigUtils.getStringWithAltKeys((Properties) typedProperties, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
        this.missingCheckpointStrategy = IncrSourceHelper.getMissingCheckpointStrategy(typedProperties);
        this.numInstantsPerFetch = ConfigUtils.getIntWithAltKeys(typedProperties, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
        this.checkIfFileExists = ConfigUtils.getBooleanWithAltKeys(typedProperties, CloudSourceConfig.ENABLE_EXISTS_CHECK);
        this.cloudDataFetcher = cloudDataFetcher;
        this.queryRunner = queryRunner;
        this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider());
        this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(typedProperties);
        LOG.info("srcPath: " + this.srcPath);
        LOG.info("missingCheckpointStrategy: " + this.missingCheckpointStrategy);
        LOG.info("numInstantsPerFetch: " + this.numInstantsPerFetch);
        LOG.info("checkIfFileExists: " + this.checkIfFileExists);
    }

    @Override // org.apache.hudi.utilities.sources.HoodieIncrSource, org.apache.hudi.utilities.sources.Source
    protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> option) {
        if (option.isPresent()) {
            ValidationUtils.checkArgument(option.get() instanceof StreamerCheckpointV1, "For GcsEventsHoodieIncrSource, only StreamerCheckpointV1, i.e., requested time-based checkpoint, is supported. Checkpoint provided is: " + option.get());
        }
        return option;
    }

    @Override // org.apache.hudi.utilities.sources.HoodieIncrSource, org.apache.hudi.utilities.sources.RowSource
    public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> option, long j) {
        CloudObjectIncrCheckpoint fromString = CloudObjectIncrCheckpoint.fromString(option);
        QueryInfo generateQueryInfo = IncrSourceHelper.generateQueryInfo(this.sparkContext, this.srcPath, this.numInstantsPerFetch, Option.of(new StreamerCheckpointV1(fromString.getCommit())), this.missingCheckpointStrategy, IncrSourceHelper.getHollowCommitHandleMode(this.props), HoodieRecord.COMMIT_TIME_METADATA_FIELD, "name", "size", true, Option.ofNullable(fromString.getKey()));
        LOG.info("Querying GCS with:" + fromString + " and queryInfo:" + generateQueryInfo);
        if (!StringUtils.isNullOrEmpty(fromString.getKey()) || !generateQueryInfo.areStartAndEndInstantsEqual()) {
            return this.cloudDataFetcher.fetchPartitionedSource(CloudObjectsSelectorCommon.Type.GCS, fromString, this.sourceProfileSupplier, this.queryRunner.run(generateQueryInfo, this.snapshotLoadQuerySplitter), this.schemaProvider, j);
        }
        LOG.info("Source of file names is empty. Returning empty result and endInstant: " + generateQueryInfo.getStartInstant());
        return Pair.of(Option.empty(), new StreamerCheckpointV1(generateQueryInfo.getStartInstant()));
    }
}
