/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalInputSplits
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
    private final Configuration conf;
    private final Path path;
    private final RowType rowType;
    private final long maxCompactionMemoryInBytes;
    private final PartitionPruners.PartitionPruner partitionPruner;
    private final boolean skipCompaction;
    private final boolean skipClustering;

    private IncrementalInputSplits(Configuration conf, Path path, RowType rowType, long maxCompactionMemoryInBytes, @Nullable PartitionPruners.PartitionPruner partitionPruner, boolean skipCompaction, boolean skipClustering) {
        this.conf = conf;
        this.path = path;
        this.rowType = rowType;
        this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
        this.partitionPruner = partitionPruner;
        this.skipCompaction = skipCompaction;
        this.skipClustering = skipClustering;
    }

    public static Builder builder() {
        return new Builder();
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, boolean cdcEnabled) {
        List<StoragePathInfo> fileInfoList;
        Set<String> readPartitions;
        String endInstant;
        String rangeStart;
        String rangeEnd;
        HoodieTimeline commitTimeline = this.getReadTimeline(metaClient);
        if (commitTimeline.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return Result.EMPTY;
        }
        String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
        String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT);
        boolean startFromEarliest = "earliest".equalsIgnoreCase(startCommit);
        boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit);
        boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit);
        boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange;
        List<HoodieInstant> instants = this.filterInstantsWithRange(commitTimeline, null);
        String string = rangeEnd = endOutOfRange || instants.isEmpty() ? endCommit : instants.get(instants.size() - 1).getTimestamp();
        String string2 = startFromEarliest ? null : (rangeStart = startCommit == null ? rangeEnd : startCommit);
        InstantRange instantRange = !fullTableScan ? InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd).rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(cdcEnabled).build() : (startFromEarliest && endCommit == null ? null : InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd).rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build());
        String string3 = endInstant = endOutOfRange || endCommit == null ? ((HoodieInstant)commitTimeline.lastInstant().get()).getTimestamp() : rangeEnd;
        if (fullTableScan) {
            FileIndex fileIndex = this.getFileIndex();
            readPartitions = new TreeSet<String>(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileInfoList = fileIndex.getFilesInPartitions();
        } else {
            if (instants.size() == 0) {
                LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
                return Result.EMPTY;
            }
            if (cdcEnabled) {
                List<MergeOnReadInputSplit> inputSplits = this.getCdcInputSplits(metaClient, instantRange);
                return Result.instance(inputSplits, endInstant);
            }
            String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
            List<HoodieCommitMetadata> metadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
            readPartitions = this.getReadPartitions(metadataList);
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            List<StoragePathInfo> files = WriteProfiles.getFilesFromMetadata(this.path, (org.apache.hadoop.conf.Configuration)metaClient.getStorageConf().unwrap(), metadataList, metaClient.getTableType(), false);
            if (files == null) {
                LOG.warn("Found deleted files in metadata, fall back to full table scan.");
                FileIndex fileIndex = this.getFileIndex();
                readPartitions = new TreeSet<String>(fileIndex.getOrBuildPartitionPaths());
                if (readPartitions.size() == 0) {
                    LOG.warn("No partitions found for reading in user provided path.");
                    return Result.EMPTY;
                }
                fileInfoList = fileIndex.getFilesInPartitions();
            } else {
                fileInfoList = files;
            }
        }
        if (fileInfoList.size() == 0) {
            LOG.warn("No files found for reading in user provided path.");
            return Result.EMPTY;
        }
        List<MergeOnReadInputSplit> inputSplits = this.getInputSplits(metaClient, commitTimeline, fileInfoList, readPartitions, endInstant, instantRange, false);
        return Result.instance(inputSplits, endInstant);
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, @Nullable String issuedInstant, @Nullable String issuedOffset, boolean cdcEnabled) {
        String endInstant;
        metaClient.reloadActiveTimeline();
        HoodieTimeline commitTimeline = this.getReadTimeline(metaClient);
        if (commitTimeline.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return Result.EMPTY;
        }
        Result hollowSplits = this.getHollowInputSplits(metaClient, (org.apache.hadoop.conf.Configuration)metaClient.getStorageConf().unwrapAs(org.apache.hadoop.conf.Configuration.class), issuedInstant, issuedOffset, commitTimeline, cdcEnabled);
        List<HoodieInstant> instants = this.filterInstantsWithRange(commitTimeline, issuedInstant);
        String string = endInstant = instants.size() == 0 ? null : instants.get(instants.size() - 1).getTimestamp();
        if (endInstant == null) {
            if (hollowSplits.isEmpty()) {
                LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
                return Result.EMPTY;
            }
            return hollowSplits;
        }
        InstantRange instantRange = this.getInstantRange(issuedInstant, endInstant, cdcEnabled);
        String offsetToIssue = instants.stream().map(HoodieInstant::getStateTransitionTime).max(String::compareTo).orElse(endInstant);
        if (instantRange == null) {
            FileIndex fileIndex = this.getFileIndex();
            TreeSet<String> readPartitions = new TreeSet<String>(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading under path: " + this.path);
                return Result.EMPTY;
            }
            List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
            if (pathInfoList.size() == 0) {
                LOG.warn("No files found for reading under path: " + this.path);
                return Result.EMPTY;
            }
            List<MergeOnReadInputSplit> inputSplits = this.getInputSplits(metaClient, commitTimeline, pathInfoList, readPartitions, endInstant, null, false);
            return Result.instance(inputSplits, endInstant, offsetToIssue);
        }
        List<MergeOnReadInputSplit> inputSplits = this.getIncInputSplits(metaClient, (org.apache.hadoop.conf.Configuration)metaClient.getStorageConf().unwrapAs(org.apache.hadoop.conf.Configuration.class), commitTimeline, instants, instantRange, endInstant, cdcEnabled);
        return Result.instance(IncrementalInputSplits.mergeList(hollowSplits.getInputSplits(), inputSplits), endInstant, offsetToIssue);
    }

    private List<MergeOnReadInputSplit> getIncInputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf, HoodieTimeline commitTimeline, List<HoodieInstant> instants, InstantRange instantRange, String endInstant, boolean cdcEnabled) {
        List<HoodieCommitMetadata> metadataList;
        Set<String> readPartitions;
        if (cdcEnabled) {
            return this.getCdcInputSplits(metaClient, instantRange);
        }
        String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
        List activeMetadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
        List<HoodieCommitMetadata> archivedMetadataList = this.getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
        if (archivedMetadataList.size() > 0) {
            LOG.warn("\n--------------------------------------------------------------------------------\n---------- caution: the reader has fall behind too much from the writer,\n---------- tweak 'read.tasks' option to add parallelism of read tasks.\n--------------------------------------------------------------------------------");
        }
        if ((readPartitions = this.getReadPartitions(metadataList = IncrementalInputSplits.mergeList(archivedMetadataList, activeMetadataList))).size() == 0) {
            LOG.warn("No partitions found for reading under path: " + this.path);
            return Collections.emptyList();
        }
        List<StoragePathInfo> pathInfoList = WriteProfiles.getFilesFromMetadata(this.path, hadoopConf, metadataList, metaClient.getTableType());
        if (pathInfoList.size() == 0) {
            LOG.warn("No files found for reading under path: " + this.path);
            return Collections.emptyList();
        }
        return this.getInputSplits(metaClient, commitTimeline, pathInfoList, readPartitions, endInstant, instantRange, this.skipCompaction);
    }

    private Result getHollowInputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf, @Nullable String issuedInstant, @Nullable String issuedOffset, HoodieTimeline commitTimeline, boolean cdcEnabled) {
        if (issuedInstant == null || issuedOffset == null) {
            return Result.EMPTY;
        }
        List instants = commitTimeline.getInstantsAsStream().filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN, (String)issuedInstant)).filter(s -> HoodieTimeline.compareTimestamps((String)s.getStateTransitionTime(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)issuedOffset)).filter(s -> StreamerUtil.isWriteCommit(metaClient.getTableType(), s, commitTimeline)).collect(Collectors.toList());
        if (instants.isEmpty()) {
            return Result.EMPTY;
        }
        String offsetToIssue = instants.stream().map(HoodieInstant::getStateTransitionTime).max(String::compareTo).orElse(issuedOffset);
        List<MergeOnReadInputSplit> inputSplits = instants.stream().map(instant -> {
            String instantTs = instant.getTimestamp();
            InstantRange instantRange = InstantRange.builder().startInstant(instantTs).endInstant(instantTs).nullableBoundary(cdcEnabled).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
            return this.getIncInputSplits(metaClient, hadoopConf, commitTimeline, Collections.singletonList(instant), instantRange, instantTs, cdcEnabled);
        }).flatMap(Collection::stream).collect(Collectors.toList());
        return Result.instance(inputSplits, issuedInstant, offsetToIssue);
    }

    @Nullable
    private InstantRange getInstantRange(String issuedInstant, String instantToIssue, boolean nullableBoundary) {
        if (issuedInstant != null) {
            return InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue).nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
        }
        if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
            String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
            return startCommit.equalsIgnoreCase("earliest") ? null : InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue).nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
        }
        return InstantRange.builder().startInstant(instantToIssue).endInstant(instantToIssue).nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
    }

    private List<MergeOnReadInputSplit> getInputSplits(HoodieTableMetaClient metaClient, HoodieTimeline commitTimeline, List<StoragePathInfo> pathInfoList, Set<String> readPartitions, String endInstant, InstantRange instantRange, boolean skipBaseFiles) {
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList);
        AtomicInteger cnt = new AtomicInteger(0);
        String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
        return readPartitions.stream().map(relPartitionPath -> IncrementalInputSplits.getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles).map(fileSlice -> {
            Option logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).filter(logPath -> !logPath.endsWith(".cdc")).collect(Collectors.toList()));
            String basePath = (String)fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, (Option<List<String>>)logPaths, endInstant, metaClient.getBasePath(), this.maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
        }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private List<MergeOnReadInputSplit> getCdcInputSplits(HoodieTableMetaClient metaClient, InstantRange instantRange) {
        HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
        Map fileSplits = extractor.extractCDCFileSplits();
        if (fileSplits.isEmpty()) {
            LOG.warn("No change logs found for reading in path: " + this.path);
            return Collections.emptyList();
        }
        AtomicInteger cnt = new AtomicInteger(0);
        return fileSplits.entrySet().stream().map(splits -> new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), this.maxCompactionMemoryInBytes, ((HoodieFileGroupId)splits.getKey()).getFileId(), (HoodieCDCFileSplit[])((List)splits.getValue()).stream().sorted().toArray(HoodieCDCFileSplit[]::new))).collect(Collectors.toList());
    }

    private static Stream<FileSlice> getFileSlices(HoodieTableFileSystemView fsView, String relPartitionPath, String endInstant, boolean skipBaseFiles) {
        return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant) : fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant);
    }

    private FileIndex getFileIndex() {
        return FileIndex.builder().path(new org.apache.hadoop.fs.Path(this.path.toUri())).conf(this.conf).rowType(this.rowType).partitionPruner(this.partitionPruner).build();
    }

    private Set<String> getReadPartitions(List<HoodieCommitMetadata> metadataList) {
        Set partitions = HoodieTableMetadataUtil.getWritePartitionPaths(metadataList);
        if (this.partitionPruner != null) {
            Set<String> selectedPartitions = this.partitionPruner.filter(partitions);
            double total = partitions.size();
            double selectedNum = selectedPartitions.size();
            double percentPruned = total == 0.0 ? 0.0 : (1.0 - selectedNum / total) * 100.0;
            LOG.info("Selected " + selectedNum + " partitions out of " + total + ", pruned " + percentPruned + "% partitions.");
            return selectedPartitions;
        }
        return partitions;
    }

    private List<HoodieCommitMetadata> getArchivedMetadata(HoodieTableMetaClient metaClient, InstantRange instantRange, HoodieTimeline commitTimeline, String tableName) {
        HoodieTimeline archivedTimeline;
        if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant()) && !(archivedTimeline = this.getArchivedReadTimeline(metaClient, instantRange.getStartInstant())).empty()) {
            return archivedTimeline.getInstantsAsStream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, archivedTimeline)).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
        HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
        return this.filterInstantsAsPerUserConfigs(timeline);
    }

    private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) {
        HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false);
        HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
        return this.filterInstantsAsPerUserConfigs(archivedCompleteTimeline);
    }

    @VisibleForTesting
    public List<HoodieInstant> filterInstantsWithRange(HoodieTimeline commitTimeline, @Nullable String issuedInstant) {
        HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
        if (issuedInstant != null) {
            return completedTimeline.getInstantsAsStream().filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)issuedInstant)).collect(Collectors.toList());
        }
        Stream<HoodieInstant> instantStream = completedTimeline.getInstantsAsStream();
        if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
            return (List)completedTimeline.lastInstant().map(Collections::singletonList).orElseGet(Collections::emptyList);
        }
        if (OptionsResolver.isSpecificStartCommit(this.conf)) {
            String startCommit = (String)this.conf.get(FlinkOptions.READ_START_COMMIT);
            instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN_OR_EQUALS, (String)startCommit));
        }
        if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
            String endCommit = (String)this.conf.get(FlinkOptions.READ_END_COMMIT);
            instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps((String)s.getTimestamp(), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)endCommit));
        }
        return instantStream.collect(Collectors.toList());
    }

    @VisibleForTesting
    public HoodieTimeline filterInstantsAsPerUserConfigs(HoodieTimeline timeline) {
        HoodieTimeline oriTimeline = timeline;
        if (OptionsResolver.isMorTable(this.conf) & this.skipCompaction) {
            timeline = timeline.filter(instant -> !instant.getAction().equals("commit"));
        }
        if (this.skipClustering) {
            timeline = timeline.filter(instant -> !ClusteringUtil.isClusteringInstant(instant, oriTimeline));
        }
        return timeline;
    }

    private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
        if (list1.isEmpty()) {
            return list2;
        }
        if (list2.isEmpty()) {
            return list1;
        }
        ArrayList<T> merged = new ArrayList<T>(list1);
        merged.addAll(list2);
        return merged;
    }

    public static class Builder {
        private Configuration conf;
        private Path path;
        private RowType rowType;
        private long maxCompactionMemoryInBytes;
        private PartitionPruners.PartitionPruner partitionPruner;
        private boolean skipCompaction = false;
        private boolean skipClustering = false;

        public Builder conf(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder path(Path path) {
            this.path = path;
            return this;
        }

        public Builder rowType(RowType rowType) {
            this.rowType = rowType;
            return this;
        }

        public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
            this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
            return this;
        }

        public Builder partitionPruner(@Nullable PartitionPruners.PartitionPruner partitionPruner) {
            this.partitionPruner = partitionPruner;
            return this;
        }

        public Builder skipCompaction(boolean skipCompaction) {
            this.skipCompaction = skipCompaction;
            return this;
        }

        public Builder skipClustering(boolean skipClustering) {
            this.skipClustering = skipClustering;
            return this;
        }

        public IncrementalInputSplits build() {
            return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), this.maxCompactionMemoryInBytes, this.partitionPruner, this.skipCompaction, this.skipClustering);
        }
    }

    public static class Result {
        private final List<MergeOnReadInputSplit> inputSplits;
        private final String endInstant;
        private final String offset;
        public static final Result EMPTY = Result.instance(Collections.emptyList(), "");

        public boolean isEmpty() {
            return this.inputSplits.size() == 0;
        }

        public List<MergeOnReadInputSplit> getInputSplits() {
            return this.inputSplits;
        }

        public String getEndInstant() {
            return this.endInstant;
        }

        @Nullable
        public String getOffset() {
            return this.offset;
        }

        private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant, @Nullable String offset) {
            this.inputSplits = inputSplits;
            this.endInstant = endInstant;
            this.offset = offset;
        }

        public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
            return new Result(inputSplits, endInstant, null);
        }

        public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant, String offset) {
            return new Result(inputSplits, endInstant, offset);
        }
    }
}

