package org.apache.paimon.flink.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.TypeUtils;

/* loaded from: input_file:org/apache/paimon/flink/lookup/TableStreamingReader.class */
public class TableStreamingReader {
    private final ReadBuilder readBuilder;

    @Nullable
    private final PredicateFilter recordFilter;
    private final StreamTableScan scan;

    public TableStreamingReader(Table table, int[] iArr, @Nullable Predicate predicate) {
        table = CoreOptions.fromMap(table.options()).startupMode() != CoreOptions.StartupMode.COMPACTED_FULL ? table.copy(Collections.singletonMap(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST_FULL.toString())) : table;
        this.readBuilder = table.newReadBuilder().withProjection(iArr).withFilter(predicate);
        this.scan = this.readBuilder.newStreamScan();
        if (predicate == null) {
            this.recordFilter = null;
            return;
        }
        List<String> fieldNames = table.rowType().getFieldNames();
        List<String> primaryKeys = table.primaryKeys();
        this.recordFilter = new PredicateFilter(TypeUtils.project(table.rowType(), iArr), PredicateBuilder.transformFieldMapping(predicate, IntStream.range(0, table.rowType().getFieldCount()).map(i -> {
            int indexOf = Ints.indexOf(iArr, i);
            if (primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i))) {
                return indexOf;
            }
            return -1;
        }).toArray()).orElse(null));
    }

    public RecordReader<InternalRow> nextBatch() throws Exception {
        try {
            return read(this.scan.plan());
        } catch (EndOfScanException e) {
            throw new IllegalArgumentException("TableStreamingReader does not support finished enumerator.", e);
        }
    }

    private RecordReader<InternalRow> read(TableScan.Plan plan) throws IOException {
        TableRead newRead = this.readBuilder.newRead();
        ArrayList arrayList = new ArrayList();
        for (Split split : plan.splits()) {
            arrayList.add(() -> {
                return newRead.createReader(split);
            });
        }
        RecordReader<InternalRow> create = ConcatRecordReader.create(arrayList);
        if (this.recordFilter != null) {
            PredicateFilter predicateFilter = this.recordFilter;
            predicateFilter.getClass();
            create = create.filter(predicateFilter::test);
        }
        return create;
    }
}
