package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallellyExecuteUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

/* loaded from: input_file:org/apache/paimon/operation/AbstractFileStoreScan.class */
public abstract class AbstractFileStoreScan implements FileStoreScan {
    private final FieldStatsArraySerializer partitionStatsConverter;
    private final RowType partitionType;
    private final SnapshotManager snapshotManager;
    private final ManifestFile.Factory manifestFileFactory;
    private final ManifestList manifestList;
    private final int numOfBuckets;
    private final boolean checkNumOfBuckets;
    private final SchemaManager schemaManager;
    protected final ScanBucketFilter bucketKeyFilter;
    private PartitionPredicate partitionFilter;
    private final Integer scanManifestParallelism;
    private Snapshot specifiedSnapshot = null;
    private Filter<Integer> bucketFilter = null;
    private List<ManifestFileMeta> specifiedManifests = null;
    private ScanMode scanMode = ScanMode.ALL;
    private Filter<Integer> levelFilter = null;
    private Long dataFileTimeMills = null;
    private ManifestCacheFilter manifestCacheFilter = null;
    private ScanMetrics scanMetrics = null;
    private final ConcurrentMap<Long, TableSchema> tableSchemas = new ConcurrentHashMap();

    public AbstractFileStoreScan(RowType rowType, ScanBucketFilter scanBucketFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, ManifestFile.Factory factory, ManifestList.Factory factory2, int i, boolean z, Integer num) {
        this.partitionStatsConverter = new FieldStatsArraySerializer(rowType);
        this.partitionType = rowType;
        this.bucketKeyFilter = scanBucketFilter;
        this.snapshotManager = snapshotManager;
        this.schemaManager = schemaManager;
        this.manifestFileFactory = factory;
        this.manifestList = factory2.create();
        this.numOfBuckets = i;
        this.checkNumOfBuckets = z;
        this.scanManifestParallelism = num;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withPartitionFilter(Predicate predicate) {
        if (this.partitionType.getFieldCount() <= 0 || predicate == null) {
            this.partitionFilter = null;
        } else {
            this.partitionFilter = PartitionPredicate.fromPredicate(this.partitionType, predicate);
        }
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withPartitionFilter(List<BinaryRow> list) {
        if (this.partitionType.getFieldCount() <= 0 || list.isEmpty()) {
            this.partitionFilter = null;
        } else {
            this.partitionFilter = PartitionPredicate.fromMultiple(this.partitionType, list);
        }
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withBucket(int i) {
        this.bucketFilter = num -> {
            return num.intValue() == i;
        };
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withBucketFilter(Filter<Integer> filter) {
        this.bucketFilter = filter;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withPartitionBucket(BinaryRow binaryRow, int i) {
        if (this.manifestCacheFilter != null) {
            Preconditions.checkArgument(this.manifestCacheFilter.test(binaryRow, i), String.format("This is a bug! The partition %s and bucket %s is filtered!", binaryRow, Integer.valueOf(i)));
        }
        withPartitionFilter(Collections.singletonList(binaryRow));
        withBucket(i);
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withSnapshot(long j) {
        Preconditions.checkState(this.specifiedManifests == null, "Cannot set both snapshot and manifests.");
        this.specifiedSnapshot = this.snapshotManager.snapshot(j);
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withSnapshot(Snapshot snapshot) {
        Preconditions.checkState(this.specifiedManifests == null, "Cannot set both snapshot and manifests.");
        this.specifiedSnapshot = snapshot;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withManifestList(List<ManifestFileMeta> list) {
        Preconditions.checkState(this.specifiedSnapshot == null, "Cannot set both snapshot and manifests.");
        this.specifiedManifests = list;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withKind(ScanMode scanMode) {
        this.scanMode = scanMode;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withLevelFilter(Filter<Integer> filter) {
        this.levelFilter = filter;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withDataFileTimeMills(long j) {
        this.dataFileTimeMills = Long.valueOf(j);
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestCacheFilter) {
        this.manifestCacheFilter = manifestCacheFilter;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan withMetrics(ScanMetrics scanMetrics) {
        this.scanMetrics = scanMetrics;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreScan
    public FileStoreScan.Plan plan() {
        Pair<Snapshot, List<ManifestEntry>> doPlan = doPlan(this::readManifestFileMeta);
        final Snapshot left = doPlan.getLeft();
        final List<ManifestEntry> right = doPlan.getRight();
        return new FileStoreScan.Plan() { // from class: org.apache.paimon.operation.AbstractFileStoreScan.1
            @Override // org.apache.paimon.operation.FileStoreScan.Plan
            @Nullable
            public Long watermark() {
                if (left == null) {
                    return null;
                }
                return left.watermark();
            }

            @Override // org.apache.paimon.operation.FileStoreScan.Plan
            @Nullable
            public Long snapshotId() {
                if (left == null) {
                    return null;
                }
                return Long.valueOf(left.id());
            }

            @Override // org.apache.paimon.operation.FileStoreScan.Plan
            public ScanMode scanMode() {
                return AbstractFileStoreScan.this.scanMode;
            }

            @Override // org.apache.paimon.operation.FileStoreScan.Plan
            public List<ManifestEntry> files() {
                return right;
            }
        };
    }

    private Pair<Snapshot, List<ManifestEntry>> doPlan(Function<ManifestFileMeta, List<ManifestEntry>> function) {
        long nanoTime = System.nanoTime();
        List<ManifestFileMeta> list = this.specifiedManifests;
        Snapshot snapshot = null;
        if (list == null) {
            snapshot = this.specifiedSnapshot == null ? this.snapshotManager.latestSnapshot() : this.specifiedSnapshot;
            list = snapshot == null ? Collections.emptyList() : readManifests(snapshot);
        }
        long sum = list.stream().mapToLong(manifestFileMeta -> {
            return manifestFileMeta.numAddedFiles() + manifestFileMeta.numDeletedFiles();
        }).sum();
        AtomicLong atomicLong = new AtomicLong(0L);
        Iterable parallelismBatchIterable = ParallellyExecuteUtils.parallelismBatchIterable(list2 -> {
            List list2 = (List) list2.parallelStream().filter(this::filterManifestFileMeta).flatMap(manifestFileMeta2 -> {
                return ((List) function.apply(manifestFileMeta2)).stream();
            }).filter(this::filterUnmergedManifestEntry).collect(Collectors.toList());
            atomicLong.getAndAdd(list2.size());
            return list2;
        }, list, this.scanManifestParallelism);
        ArrayList arrayList = new ArrayList();
        Collection<ManifestEntry> mergeEntries = ManifestEntry.mergeEntries(parallelismBatchIterable);
        long j = sum - atomicLong.get();
        for (ManifestEntry manifestEntry : mergeEntries) {
            if (this.checkNumOfBuckets && manifestEntry.totalBuckets() != this.numOfBuckets) {
                throw new RuntimeException(String.format("Try to write %s with a new bucket num %d, but the previous bucket num is %d. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", this.partitionType.getFieldCount() > 0 ? "partition " + FileStorePathFactory.getPartitionComputer(this.partitionType, FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue()).generatePartValues(manifestEntry.partition()) : ActionFactory.TABLE, Integer.valueOf(this.numOfBuckets), Integer.valueOf(manifestEntry.totalBuckets())));
            }
            if (filterMergedManifestEntry(manifestEntry)) {
                arrayList.add(manifestEntry);
            }
        }
        long size = arrayList.size();
        long size2 = mergeEntries.size() - arrayList.size();
        List list3 = (List) ((LinkedHashMap) arrayList.stream().collect(Collectors.groupingBy(manifestEntry2 -> {
            return Pair.of(manifestEntry2.partition(), Integer.valueOf(manifestEntry2.bucket()));
        }, LinkedHashMap::new, Collectors.toList()))).values().stream().filter(this::filterWholeBucketByStats).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        long size3 = size - list3.size();
        long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
        if (this.scanMetrics != null) {
            this.scanMetrics.reportScan(new ScanStats(nanoTime2, list.size(), j, size2, size3, list3.size()));
        }
        return Pair.of(snapshot, list3);
    }

    private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
        switch (this.scanMode) {
            case ALL:
                return snapshot.dataManifests(this.manifestList);
            case DELTA:
                return snapshot.deltaManifests(this.manifestList);
            case CHANGELOG:
                if (snapshot.version() > 1) {
                    return snapshot.changelogManifests(this.manifestList);
                }
                if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
                    return snapshot.deltaManifests(this.manifestList);
                }
                throw new IllegalStateException(String.format("Incremental scan does not accept %s snapshot", snapshot.commitKind()));
            default:
                throw new UnsupportedOperationException("Unknown scan kind " + this.scanMode.name());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TableSchema scanTableSchema(long j) {
        return this.tableSchemas.computeIfAbsent(Long.valueOf(j), l -> {
            return this.schemaManager.schema(j);
        });
    }

    private boolean filterManifestFileMeta(ManifestFileMeta manifestFileMeta) {
        return this.partitionFilter == null || this.partitionFilter.test(manifestFileMeta.numAddedFiles() + manifestFileMeta.numDeletedFiles(), manifestFileMeta.partitionStats().fields(this.partitionStatsConverter));
    }

    private boolean filterUnmergedManifestEntry(ManifestEntry manifestEntry) {
        if (this.dataFileTimeMills == null || manifestEntry.file().creationTimeEpochMillis() >= this.dataFileTimeMills.longValue()) {
            return filterByStats(manifestEntry);
        }
        return false;
    }

    protected abstract boolean filterByStats(ManifestEntry manifestEntry);

    private boolean filterMergedManifestEntry(ManifestEntry manifestEntry) {
        return (this.bucketFilter == null || this.bucketFilter.test(Integer.valueOf(manifestEntry.bucket()))) && this.bucketKeyFilter.select(manifestEntry.bucket(), manifestEntry.totalBuckets()) && (this.levelFilter == null || this.levelFilter.test(Integer.valueOf(manifestEntry.file().level())));
    }

    protected abstract boolean filterWholeBucketByStats(List<ManifestEntry> list);

    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifestFileMeta) {
        return this.manifestFileFactory.create().read(manifestFileMeta.fileName(), manifestCacheRowFilter(), manifestEntryRowFilter());
    }

    private Filter<InternalRow> manifestEntryRowFilter() {
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> function = ManifestEntrySerializer.totalBucketGetter();
        return internalRow -> {
            if (this.partitionFilter != null && !this.partitionFilter.test((BinaryRow) partitionGetter.apply(internalRow))) {
                return false;
            }
            if (this.bucketFilter == null || this.numOfBuckets != ((Integer) function.apply(internalRow)).intValue()) {
                return true;
            }
            return this.bucketFilter.test(bucketGetter.apply(internalRow));
        };
    }

    private Filter<InternalRow> manifestCacheRowFilter() {
        if (this.manifestCacheFilter == null) {
            return Filter.alwaysTrue();
        }
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> function = ManifestEntrySerializer.totalBucketGetter();
        return internalRow -> {
            if (this.numOfBuckets != ((Integer) function.apply(internalRow)).intValue()) {
                return true;
            }
            return this.manifestCacheFilter.test((BinaryRow) partitionGetter.apply(internalRow), ((Integer) bucketGetter.apply(internalRow)).intValue());
        };
    }
}
