package org.apache.paimon.flink.source.assigners;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.table.source.DataSplit;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.class */
public class DynamicPartitionPruningAssigner implements SplitAssigner {
    private final SplitAssigner innerAssigner;
    private final Projection partitionRowProjection;
    private final DynamicFilteringData dynamicFilteringData;

    public DynamicPartitionPruningAssigner(SplitAssigner splitAssigner, Projection projection, DynamicFilteringData dynamicFilteringData) {
        this.innerAssigner = splitAssigner;
        this.partitionRowProjection = projection;
        this.dynamicFilteringData = dynamicFilteringData;
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public List<FileStoreSourceSplit> getNext(int i, @Nullable String str) {
        List<FileStoreSourceSplit> next = this.innerAssigner.getNext(i, str);
        while (true) {
            List<FileStoreSourceSplit> list = next;
            if (list.isEmpty()) {
                return Collections.emptyList();
            }
            List<FileStoreSourceSplit> list2 = (List) list.stream().filter(this::filter).collect(Collectors.toList());
            if (!list2.isEmpty()) {
                return list2;
            }
            next = this.innerAssigner.getNext(i, str);
        }
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public void addSplit(int i, FileStoreSourceSplit fileStoreSourceSplit) {
        if (filter(fileStoreSourceSplit)) {
            this.innerAssigner.addSplit(i, fileStoreSourceSplit);
        }
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public void addSplitsBack(int i, List<FileStoreSourceSplit> list) {
        this.innerAssigner.addSplitsBack(i, list);
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public Collection<FileStoreSourceSplit> remainingSplits() {
        return (Collection) this.innerAssigner.remainingSplits().stream().filter(this::filter).collect(Collectors.toList());
    }

    public static SplitAssigner createDynamicPartitionPruningAssignerIfNeeded(int i, SplitAssigner splitAssigner, Projection projection, SourceEvent sourceEvent, Logger logger) {
        DynamicFilteringData data = ((DynamicFilteringEvent) sourceEvent).getData();
        logger.info("Received DynamicFilteringEvent: {}, is filtering: {}.", Integer.valueOf(i), Boolean.valueOf(data.isFiltering()));
        return data.isFiltering() ? new DynamicPartitionPruningAssigner(splitAssigner, projection, data) : splitAssigner;
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public Optional<Long> getNextSnapshotId(int i) {
        return this.innerAssigner.getNextSnapshotId(i);
    }

    private boolean filter(FileStoreSourceSplit fileStoreSourceSplit) {
        return this.dynamicFilteringData.contains(new FlinkRowData(this.partitionRowProjection.apply(((DataSplit) fileStoreSourceSplit.split()).partition())));
    }
}
