package org.apache.hudi.utilities.sources;

import com.esotericsoftware.minlog.Log;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.class */
public class S3EventsHoodieIncrSource extends HoodieIncrSource {
    private static final Logger LOG = LogManager.getLogger(S3EventsHoodieIncrSource.class);

    /* loaded from: input_file:org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource$Config.class */
    static class Config {
        static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
        static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;
        static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
        static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";
        static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
        static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring";
        static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";

        Config() {
        }
    }

    public S3EventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
    }

    private DataFrameReader getDataFrameReader(String str) {
        DataFrameReader format = this.sparkSession.read().format(str);
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.spark.datasource.options", null))) {
            try {
                Map map = (Map) new ObjectMapper().readValue(this.props.getString("hoodie.deltastreamer.source.s3incr.spark.datasource.options"), Map.class);
                Log.info(String.format("sparkOptions loaded: %s", map));
                format = format.options(map);
            } catch (IOException e) {
                throw new HoodieException(String.format("Failed to parse sparkOptions: %s", this.props.getString("hoodie.deltastreamer.source.s3incr.spark.datasource.options")), e);
            }
        }
        return format;
    }

    @Override // org.apache.hudi.utilities.sources.HoodieIncrSource, org.apache.hudi.utilities.sources.RowSource
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        String str;
        DataSourceUtils.checkRequiredProperties(this.props, Collections.singletonList("hoodie.deltastreamer.source.hoodieincr.path"));
        String string = this.props.getString("hoodie.deltastreamer.source.hoodieincr.path");
        int integer = this.props.getInteger("hoodie.deltastreamer.source.hoodieincr.num_instants", HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH.intValue());
        boolean z = this.props.getBoolean("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt", HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT.booleanValue());
        IncrSourceHelper.MissingCheckpointStrategy valueOf = this.props.containsKey("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy") ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(this.props.getString("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy")) : null;
        if (z) {
            valueOf = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
        }
        String string2 = this.props.getString("hoodie.deltastreamer.source.hoodieincr.file.format", "parquet");
        Pair<String, Pair<String, String>> calculateBeginAndEndInstants = IncrSourceHelper.calculateBeginAndEndInstants(this.sparkContext, string, integer, option.isPresent() ? option.get().isEmpty() ? Option.empty() : option : Option.empty(), valueOf);
        if (calculateBeginAndEndInstants.getValue().getKey().equals(calculateBeginAndEndInstants.getValue().getValue())) {
            LOG.warn("Already caught up. Begin Checkpoint was :" + calculateBeginAndEndInstants.getValue().getKey());
            return Pair.of(Option.empty(), calculateBeginAndEndInstants.getValue().getKey());
        }
        Dataset load = calculateBeginAndEndInstants.getKey().equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) ? this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), calculateBeginAndEndInstants.getRight().getLeft()).option(DataSourceReadOptions.END_INSTANTTIME().key(), calculateBeginAndEndInstants.getRight().getRight()).load(string) : this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(string).filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, calculateBeginAndEndInstants.getRight().getLeft()));
        if (load.isEmpty()) {
            return Pair.of(Option.empty(), calculateBeginAndEndInstants.getRight().getRight());
        }
        str = "s3.object.size > 0";
        str = StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.key.prefix", null)) ? "s3.object.size > 0" : str + " and s3.object.key like '" + this.props.getString("hoodie.deltastreamer.source.s3incr.key.prefix") + "%'";
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", null))) {
            str = str + " and s3.object.key not like '" + this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.prefix") + "%'";
        }
        if (!StringUtils.isNullOrEmpty(this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.substring", null))) {
            str = str + " and s3.object.key not like '%" + this.props.getString("hoodie.deltastreamer.source.s3incr.ignore.key.substring") + "%'";
        }
        String str2 = str + " and s3.object.key like '%" + string2 + "%'";
        String str3 = this.props.getString("hoodie.deltastreamer.source.s3incr.fs.prefix", "s3").toLowerCase() + "://";
        List<Row> collectAsList = load.filter(str2).select("s3.bucket.name", new String[]{"s3.object.key"}).distinct().collectAsList();
        boolean z2 = this.props.getBoolean("hoodie.deltastreamer.source.s3incr.check.file.exists", Config.DEFAULT_ENABLE_EXISTS_CHECK.booleanValue());
        ArrayList arrayList = new ArrayList();
        for (Row row : collectAsList) {
            String string3 = row.getString(0);
            String str4 = str3 + string3 + "/" + row.getString(1);
            if (z2) {
                try {
                    if (FSUtils.getFs(str3 + string3, this.sparkSession.sparkContext().hadoopConfiguration()).exists(new Path(str4))) {
                        arrayList.add(str4);
                    }
                } catch (IOException e) {
                    LOG.error(String.format("Error while checking path exists for %s ", str4), e);
                }
            } else {
                arrayList.add(str4);
            }
        }
        Option empty = Option.empty();
        if (!arrayList.isEmpty()) {
            empty = Option.of(getDataFrameReader(string2).load((String[]) arrayList.toArray(new String[0])));
        }
        return Pair.of(empty, calculateBeginAndEndInstants.getRight().getRight());
    }
}
