package org.apache.paimon.flink.lookup;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.utils.UserDefinedSeqComparator;

/* loaded from: input_file:org/apache/paimon/flink/lookup/FullCacheLookupTable.class */
public abstract class FullCacheLookupTable implements LookupTable {
    protected final Context context;
    protected final RowType projectedType;

    @Nullable
    protected final FieldsComparator userDefinedSeqComparator;
    protected final int appendUdsFieldNumber;
    protected RocksDBStateFactory stateFactory;
    private LookupStreamingReader reader;
    private Predicate specificPartition;

    /* loaded from: input_file:org/apache/paimon/flink/lookup/FullCacheLookupTable$Context.class */
    public static class Context {
        public final FileStoreTable table;
        public final int[] projection;

        @Nullable
        public final Predicate tablePredicate;

        @Nullable
        public final Predicate projectedPredicate;
        public final File tempPath;
        public final List<String> joinKey;

        public Context(FileStoreTable fileStoreTable, int[] iArr, @Nullable Predicate predicate, @Nullable Predicate predicate2, File file, List<String> list) {
            this.table = fileStoreTable;
            this.projection = iArr;
            this.tablePredicate = predicate;
            this.projectedPredicate = predicate2;
            this.tempPath = file;
            this.joinKey = list;
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/lookup/FullCacheLookupTable$TableBulkLoader.class */
    public interface TableBulkLoader {
        void write(byte[] bArr, byte[] bArr2) throws BulkLoader.WriteException, IOException;

        void finish() throws IOException;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public FullCacheLookupTable(Context context) {
        this.context = context;
        FileStoreTable fileStoreTable = context.table;
        List sequenceField = fileStoreTable.primaryKeys().size() > 0 ? new CoreOptions(fileStoreTable.options()).sequenceField() : new ArrayList();
        RowType project = TypeUtils.project(fileStoreTable.rowType(), context.projection);
        if (sequenceField.size() > 0) {
            RowType.Builder builder = RowType.builder();
            project.getFields().forEach(dataField -> {
                builder.field(dataField.name(), dataField.type());
            });
            RowType rowType = fileStoreTable.rowType();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Stream stream = sequenceField.stream();
            project.getClass();
            Stream filter = stream.filter(project::notContainsField);
            rowType.getClass();
            filter.map(rowType::getField).forEach(dataField2 -> {
                atomicInteger.incrementAndGet();
                builder.field(dataField2.name(), dataField2.type());
            });
            project = builder.build();
            this.userDefinedSeqComparator = UserDefinedSeqComparator.create(project, (List<String>) sequenceField);
            this.appendUdsFieldNumber = atomicInteger.get();
        } else {
            this.userDefinedSeqComparator = null;
            this.appendUdsFieldNumber = 0;
        }
        this.projectedType = project;
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void specificPartitionFilter(Predicate predicate) {
        this.specificPartition = predicate;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void openStateFactory() throws Exception {
        this.stateFactory = new RocksDBStateFactory(this.context.tempPath.toString(), this.context.table.coreOptions().toConfiguration(), null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void bootstrap() throws Exception {
        this.reader = new LookupStreamingReader(this.context.table, this.context.projection, PredicateBuilder.andNullable(this.context.tablePredicate, this.specificPartition));
        BinaryExternalSortBuffer createBulkLoadSorter = RocksDBState.createBulkLoadSorter(IOManager.create(this.context.tempPath.toString()), this.context.table.coreOptions());
        Predicate projectedPredicate = projectedPredicate();
        RecordReaderIterator recordReaderIterator = new RecordReaderIterator(this.reader.nextBatch(true));
        Throwable th = null;
        while (recordReaderIterator.hasNext()) {
            try {
                try {
                    InternalRow internalRow = (InternalRow) recordReaderIterator.next();
                    if (projectedPredicate == null || projectedPredicate.test(internalRow)) {
                        createBulkLoadSorter.write(GenericRow.of(toKeyBytes(internalRow), toValueBytes(internalRow)));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (recordReaderIterator != null) {
                    if (th != null) {
                        try {
                            recordReaderIterator.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        recordReaderIterator.close();
                    }
                }
                throw th2;
            }
        }
        if (recordReaderIterator != null) {
            if (0 != 0) {
                try {
                    recordReaderIterator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                recordReaderIterator.close();
            }
        }
        MutableObjectIterator<BinaryRow> sortedIterator = createBulkLoadSorter.sortedIterator();
        BinaryRow binaryRow = new BinaryRow(2);
        TableBulkLoader createBulkLoader = createBulkLoader();
        while (true) {
            try {
                BinaryRow next = sortedIterator.next(binaryRow);
                binaryRow = next;
                if (next == null) {
                    createBulkLoader.finish();
                    createBulkLoadSorter.clear();
                    return;
                }
                createBulkLoader.write(binaryRow.getBinary(0), binaryRow.getBinary(1));
            } catch (BulkLoader.WriteException e) {
                throw new RuntimeException("Exception in bulkLoad, the most suspicious reason is that your data contains duplicates, please check your lookup table. ", e.getCause());
            }
        }
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void refresh() throws Exception {
        RecordReaderIterator recordReaderIterator;
        Throwable th;
        while (true) {
            recordReaderIterator = new RecordReaderIterator(this.reader.nextBatch(false));
            th = null;
            try {
                try {
                    if (!recordReaderIterator.hasNext()) {
                        break;
                    }
                    refresh(recordReaderIterator);
                    if (recordReaderIterator != null) {
                        if (0 != 0) {
                            try {
                                recordReaderIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            recordReaderIterator.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (recordReaderIterator != null) {
                    if (th != null) {
                        try {
                            recordReaderIterator.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        recordReaderIterator.close();
                    }
                }
                throw th4;
            }
        }
        if (recordReaderIterator != null) {
            if (0 == 0) {
                recordReaderIterator.close();
                return;
            }
            try {
                recordReaderIterator.close();
            } catch (Throwable th6) {
                th.addSuppressed(th6);
            }
        }
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public final List<InternalRow> get(InternalRow internalRow) throws IOException {
        List<InternalRow> innerGet = innerGet(internalRow);
        if (this.appendUdsFieldNumber == 0) {
            return innerGet;
        }
        ArrayList arrayList = new ArrayList(innerGet.size());
        for (InternalRow internalRow2 : innerGet) {
            arrayList.add(new PartialRow(internalRow2.getFieldCount() - this.appendUdsFieldNumber, internalRow2));
        }
        return arrayList;
    }

    public abstract List<InternalRow> innerGet(InternalRow internalRow) throws IOException;

    public abstract void refresh(Iterator<InternalRow> it) throws IOException;

    @Nullable
    public Predicate projectedPredicate() {
        return this.context.projectedPredicate;
    }

    public abstract byte[] toKeyBytes(InternalRow internalRow) throws IOException;

    public abstract byte[] toValueBytes(InternalRow internalRow) throws IOException;

    public abstract TableBulkLoader createBulkLoader();

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.stateFactory.close();
        FileIOUtils.deleteDirectory(this.context.tempPath);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static FullCacheLookupTable create(Context context, long j) {
        List<String> primaryKeys = context.table.primaryKeys();
        return primaryKeys.isEmpty() ? new NoPrimaryKeyLookupTable(context, j) : new HashSet(primaryKeys).equals(new HashSet(context.joinKey)) ? new PrimaryKeyLookupTable(context, j, context.joinKey) : new SecondaryIndexLookupTable(context, j);
    }
}
