package org.apache.hudi.utilities.sources.helpers;

import java.util.function.Function;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.class */
public class IncrSourceHelper {
    private static final Logger LOG = LoggerFactory.getLogger(IncrSourceHelper.class);
    public static final String DEFAULT_BEGIN_TIMESTAMP = "00000000000000";
    private static final String CUMULATIVE_COLUMN_NAME = "cumulativeSize";

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/IncrSourceHelper$MissingCheckpointStrategy.class */
    public enum MissingCheckpointStrategy {
        READ_LATEST("Read from latest commit in hoodie source table"),
        READ_UPTO_LATEST_COMMIT("Read everything upto latest commit");

        private final String description;

        MissingCheckpointStrategy(String str) {
            this.description = str;
        }

        public String getDescription() {
            return this.description;
        }

        private static MissingCheckpointStrategy nullEnum() {
            return null;
        }

        @Override // java.lang.Enum
        public String toString() {
            return String.format("%s (%s)", name(), this.description);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getStrictlyLowerTimestamp(String str) {
        long parseLong = Long.parseLong(str);
        ValidationUtils.checkArgument(parseLong > 0, "Timestamp must be positive");
        return "" + (parseLong - 1);
    }

    public static TimelineUtils.HollowCommitHandling getHollowCommitHandleMode(TypedProperties typedProperties) {
        return TimelineUtils.HollowCommitHandling.valueOf(typedProperties.getString(DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), TimelineUtils.HollowCommitHandling.BLOCK.name()));
    }

    public static QueryInfo generateQueryInfo(JavaSparkContext javaSparkContext, String str, int i, Option<String> option, MissingCheckpointStrategy missingCheckpointStrategy, TimelineUtils.HollowCommitHandling hollowCommitHandling, String str2, String str3, String str4, boolean z, Option<String> option2) {
        ValidationUtils.checkArgument(i > 0, "Make sure the config hoodie.streamer.source.hoodieincr.num_instants is set to a positive value");
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(javaSparkContext.hadoopConfiguration()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
        HoodieTimeline handleHollowCommitIfNeeded = TimelineUtils.handleHollowCommitIfNeeded(build.getCommitsAndCompactionTimeline().filterCompletedInstants(), build, hollowCommitHandling);
        Function function = hoodieInstant -> {
            return hollowCommitHandling == TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME ? hoodieInstant.getCompletionTime() : hoodieInstant.getTimestamp();
        };
        String str5 = (String) option.orElseGet(() -> {
            if (missingCheckpointStrategy != null) {
                return missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST ? (String) handleHollowCommitIfNeeded.lastInstant().map(hoodieInstant2 -> {
                    return getStrictlyLowerTimestamp((String) function.apply(hoodieInstant2));
                }).orElse(DEFAULT_BEGIN_TIMESTAMP) : DEFAULT_BEGIN_TIMESTAMP;
            }
            throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest committed instant set hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy to a valid value");
        });
        String str6 = DEFAULT_BEGIN_TIMESTAMP;
        if (!str5.equals(DEFAULT_BEGIN_TIMESTAMP)) {
            Option findInstantBefore = handleHollowCommitIfNeeded.findInstantBefore(str5);
            if (findInstantBefore.isPresent()) {
                str6 = ((HoodieInstant) findInstantBefore.get()).getTimestamp();
            } else if (handleHollowCommitIfNeeded.filterCompletedInstants().firstInstant().isPresent() && ((HoodieInstant) handleHollowCommitIfNeeded.filterCompletedInstants().firstInstant().get()).getTimestamp().equals(str5)) {
                str6 = String.valueOf(Long.parseLong(str5) - 1);
            }
        }
        if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || !handleHollowCommitIfNeeded.isBeforeTimelineStarts(str5)) {
            return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), str6, str5, (String) ((z && option2.isPresent()) ? Option.fromJavaOptional(handleHollowCommitIfNeeded.findInstantsAfterOrEquals(str5, i).getInstantsAsStream().reduce((hoodieInstant2, hoodieInstant3) -> {
                return hoodieInstant3;
            })) : Option.fromJavaOptional(handleHollowCommitIfNeeded.findInstantsAfter(str5, i).getInstantsAsStream().reduce((hoodieInstant4, hoodieInstant5) -> {
                return hoodieInstant5;
            }))).map((v0) -> {
                return v0.getTimestamp();
            }).orElse(str5), str2, str3, str4);
        }
        return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), str6, str5, ((HoodieInstant) handleHollowCommitIfNeeded.lastInstant().get()).getTimestamp(), str2, str3, str4);
    }

    public static Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> dataset, long j, QueryInfo queryInfo, CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) {
        Row row;
        if (dataset.isEmpty()) {
            return Pair.of(cloudObjectIncrCheckpoint, Option.empty());
        }
        dataset.persist(StorageLevel.MEMORY_AND_DISK());
        Dataset<Row> applyOrdering = QueryRunner.applyOrdering(dataset, queryInfo.getOrderByColumns());
        Option of = Option.of(cloudObjectIncrCheckpoint.getCommit());
        Option ofNullable = Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
        Option flatMap = of.flatMap(str -> {
            return ofNullable.map(str -> {
                return str + str;
            });
        });
        if (flatMap.isPresent()) {
            applyOrdering = applyOrdering.withColumn("commit_key", functions.concat(new Column[]{functions.col(queryInfo.getOrderColumn()), functions.col(queryInfo.getKeyColumn())})).filter(functions.col("commit_key").gt(flatMap.get())).drop("commit_key");
            if (applyOrdering.isEmpty()) {
                LOG.info("Empty ordered source, returning endpoint:" + queryInfo.getEndInstant());
                dataset.unpersist();
                return Pair.of(new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), (String) ofNullable.get()), Option.empty());
            }
        }
        Dataset withColumn = applyOrdering.withColumn(CUMULATIVE_COLUMN_NAME, functions.sum(functions.col(queryInfo.getLimitColumn())).over(Window.orderBy(new Column[]{functions.col(queryInfo.getOrderColumn()), functions.col(queryInfo.getKeyColumn())})));
        Dataset filter = withColumn.filter(functions.col(CUMULATIVE_COLUMN_NAME).leq(Long.valueOf(j)));
        if (filter.isEmpty()) {
            LOG.info("First object exceeding source limit: " + j + " bytes");
            row = (Row) withColumn.select(queryInfo.getOrderColumn(), new String[]{queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME}).first();
            filter = withColumn.limit(1);
        } else {
            row = (Row) filter.select(queryInfo.getOrderColumn(), new String[]{queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME}).orderBy(new Column[]{functions.col(queryInfo.getOrderColumn()).desc(), functions.col(queryInfo.getKeyColumn()).desc()}).first();
        }
        LOG.info("Processed batch size: " + row.get(row.fieldIndex(CUMULATIVE_COLUMN_NAME)) + " bytes");
        dataset.unpersist();
        return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), row.getString(1)), Option.of(filter));
    }

    public static MissingCheckpointStrategy getMissingCheckpointStrategy(TypedProperties typedProperties) {
        if (ConfigUtils.getBooleanWithAltKeys(typedProperties, HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT)) {
            return MissingCheckpointStrategy.READ_LATEST;
        }
        if (ConfigUtils.containsConfigProperty(typedProperties, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY)) {
            return MissingCheckpointStrategy.valueOf(ConfigUtils.getStringWithAltKeys(typedProperties, HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY));
        }
        return null;
    }
}
