package org.apache.paimon.flink.lookup;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.SystemColumns;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;

/* loaded from: input_file:org/apache/paimon/flink/lookup/FullCacheLookupTable.class */
public abstract class FullCacheLookupTable implements LookupTable {
    protected final Context context;
    protected final RocksDBStateFactory stateFactory;
    protected final RowType projectedType;
    private final TableStreamingReader reader;
    private final boolean sequenceFieldEnabled;

    /* loaded from: input_file:org/apache/paimon/flink/lookup/FullCacheLookupTable$Context.class */
    public static class Context {
        public final FileStoreTable table;
        public final int[] projection;

        @Nullable
        public final Predicate predicate;
        public final File tempPath;
        public final Filter<InternalRow> recordFilter;
        public final List<String> joinKey;

        public Context(FileStoreTable fileStoreTable, int[] iArr, @Nullable Predicate predicate, File file, Filter<InternalRow> filter, List<String> list) {
            this.table = fileStoreTable;
            this.projection = iArr;
            this.predicate = predicate;
            this.tempPath = file;
            this.recordFilter = filter;
            this.joinKey = list;
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/lookup/FullCacheLookupTable$TableBulkLoader.class */
    public interface TableBulkLoader {
        void write(byte[] bArr, byte[] bArr2) throws BulkLoader.WriteException, IOException;

        void finish() throws IOException;
    }

    public FullCacheLookupTable(Context context) throws IOException {
        this.context = context;
        this.stateFactory = new RocksDBStateFactory(context.tempPath.toString(), context.table.coreOptions().toConfiguration(), null);
        FileStoreTable fileStoreTable = context.table;
        this.reader = new TableStreamingReader(fileStoreTable, context.projection, context.predicate);
        this.sequenceFieldEnabled = fileStoreTable.primaryKeys().size() > 0 && new CoreOptions(fileStoreTable.options()).sequenceField().isPresent();
        RowType project = TypeUtils.project(fileStoreTable.rowType(), context.projection);
        this.projectedType = this.sequenceFieldEnabled ? project.appendDataField(SystemColumns.SEQUENCE_NUMBER, DataTypes.BIGINT()) : project;
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void open() throws Exception {
        BinaryExternalSortBuffer createBulkLoadSorter = RocksDBState.createBulkLoadSorter(IOManager.create(this.context.tempPath.toString()), this.context.table.coreOptions());
        RecordReaderIterator recordReaderIterator = new RecordReaderIterator(this.reader.nextBatch(true, this.sequenceFieldEnabled));
        Throwable th = null;
        while (recordReaderIterator.hasNext()) {
            try {
                try {
                    InternalRow internalRow = (InternalRow) recordReaderIterator.next();
                    if (recordFilter().test(internalRow)) {
                        createBulkLoadSorter.write(GenericRow.of(toKeyBytes(internalRow), toValueBytes(internalRow)));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (recordReaderIterator != null) {
                    if (th != null) {
                        try {
                            recordReaderIterator.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        recordReaderIterator.close();
                    }
                }
                throw th2;
            }
        }
        if (recordReaderIterator != null) {
            if (0 != 0) {
                try {
                    recordReaderIterator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                recordReaderIterator.close();
            }
        }
        MutableObjectIterator<BinaryRow> sortedIterator = createBulkLoadSorter.sortedIterator();
        BinaryRow binaryRow = new BinaryRow(2);
        TableBulkLoader createBulkLoader = createBulkLoader();
        while (true) {
            try {
                BinaryRow next = sortedIterator.next(binaryRow);
                binaryRow = next;
                if (next == null) {
                    createBulkLoader.finish();
                    createBulkLoadSorter.clear();
                    return;
                }
                createBulkLoader.write(binaryRow.getBinary(0), binaryRow.getBinary(1));
            } catch (BulkLoader.WriteException e) {
                throw new RuntimeException("Exception in bulkLoad, the most suspicious reason is that your data contains duplicates, please check your lookup table. ", e.getCause());
            }
        }
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void refresh() throws Exception {
        RecordReaderIterator recordReaderIterator;
        Throwable th;
        while (true) {
            recordReaderIterator = new RecordReaderIterator(this.reader.nextBatch(false, this.sequenceFieldEnabled));
            th = null;
            try {
                try {
                    if (!recordReaderIterator.hasNext()) {
                        break;
                    }
                    refresh(recordReaderIterator, this.sequenceFieldEnabled);
                    if (recordReaderIterator != null) {
                        if (0 != 0) {
                            try {
                                recordReaderIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            recordReaderIterator.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (recordReaderIterator != null) {
                        if (th != null) {
                            try {
                                recordReaderIterator.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            recordReaderIterator.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                th = th5;
                throw th5;
            }
        }
        if (recordReaderIterator != null) {
            if (0 == 0) {
                recordReaderIterator.close();
                return;
            }
            try {
                recordReaderIterator.close();
            } catch (Throwable th6) {
                th.addSuppressed(th6);
            }
        }
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public final List<InternalRow> get(InternalRow internalRow) throws IOException {
        List<InternalRow> innerGet = innerGet(internalRow);
        if (!this.sequenceFieldEnabled) {
            return innerGet;
        }
        ArrayList arrayList = new ArrayList(innerGet.size());
        for (InternalRow internalRow2 : innerGet) {
            arrayList.add(new PartialRow(internalRow2.getFieldCount() - 1, internalRow2));
        }
        return arrayList;
    }

    public abstract List<InternalRow> innerGet(InternalRow internalRow) throws IOException;

    public abstract void refresh(Iterator<InternalRow> it, boolean z) throws IOException;

    public Filter<InternalRow> recordFilter() {
        return this.context.recordFilter;
    }

    public abstract byte[] toKeyBytes(InternalRow internalRow) throws IOException;

    public abstract byte[] toValueBytes(InternalRow internalRow) throws IOException;

    public abstract TableBulkLoader createBulkLoader();

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.stateFactory.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static FullCacheLookupTable create(Context context, long j) throws IOException {
        List<String> primaryKeys = context.table.primaryKeys();
        return primaryKeys.isEmpty() ? new NoPrimaryKeyLookupTable(context, j) : new HashSet(primaryKeys).equals(new HashSet(context.joinKey)) ? new PrimaryKeyLookupTable(context, j, context.joinKey) : new SecondaryIndexLookupTable(context, j);
    }
}
