package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lookup.FullCacheLookupTable;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/lookup/FileStoreLookupFunction.class */
public class FileStoreLookupFunction implements Serializable, Closeable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
    private final Table table;

    @Nullable
    private final DynamicPartitionLoader partitionLoader;
    private final List<String> projectFields;
    private final List<String> joinKeys;

    @Nullable
    private final Predicate predicate;
    private transient Duration refreshInterval;
    private transient File path;
    private transient LookupTable lookupTable;
    private transient long nextLoadTime;

    public FileStoreLookupFunction(Table table, int[] iArr, int[] iArr2, @Nullable Predicate predicate) {
        TableScanUtils.streamingReadingValidate(table);
        this.table = table;
        this.partitionLoader = DynamicPartitionLoader.of(table);
        this.joinKeys = (List) Arrays.stream(iArr2).mapToObj(i -> {
            return table.rowType().getFieldNames().get(iArr[i]);
        }).collect(Collectors.toList());
        if (this.partitionLoader != null) {
            this.partitionLoader.addJoinKeys(this.joinKeys);
        }
        this.projectFields = (List) Arrays.stream(iArr).mapToObj(i2 -> {
            return table.rowType().getFieldNames().get(i2);
        }).collect(Collectors.toList());
        for (String str : table.primaryKeys()) {
            if (!this.projectFields.contains(str)) {
                this.projectFields.add(str);
            }
        }
        this.predicate = predicate;
    }

    public void open(FunctionContext functionContext) throws Exception {
        open(getTmpDirectory(functionContext));
    }

    void open(String str) throws Exception {
        this.path = new File(str, "lookup-" + UUID.randomUUID());
        if (!this.path.mkdirs()) {
            throw new RuntimeException("Failed to create dir: " + this.path);
        }
        open();
    }

    private void open() throws Exception {
        if (this.partitionLoader != null) {
            this.partitionLoader.open();
        }
        this.nextLoadTime = -1L;
        Options fromMap = Options.fromMap(this.table.options());
        this.refreshInterval = (Duration) fromMap.getOptional(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL).orElse(fromMap.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL));
        List<String> fieldNames = this.table.rowType().getFieldNames();
        Stream<String> stream = this.projectFields.stream();
        fieldNames.getClass();
        int[] array = stream.mapToInt((v1) -> {
            return r1.indexOf(v1);
        }).toArray();
        FileStoreTable fileStoreTable = (FileStoreTable) this.table;
        if (fromMap.get(FlinkConnectorOptions.LOOKUP_CACHE_MODE) == FlinkConnectorOptions.LookupCacheMode.AUTO && new HashSet(this.table.primaryKeys()).equals(new HashSet(this.joinKeys))) {
            if (RemoteTableQuery.isRemoteServiceAvailable(fileStoreTable)) {
                this.lookupTable = PrimaryKeyPartialLookupTable.createRemoteTable(fileStoreTable, array, this.joinKeys);
            } else {
                try {
                    this.lookupTable = PrimaryKeyPartialLookupTable.createLocalTable(fileStoreTable, array, this.path, this.joinKeys);
                } catch (UnsupportedOperationException e) {
                }
            }
        }
        if (this.lookupTable == null) {
            this.lookupTable = FullCacheLookupTable.create(new FullCacheLookupTable.Context(fileStoreTable, array, this.predicate, createProjectedPredicate(array), this.path, this.joinKeys), ((Long) fromMap.get(RocksDBOptions.LOOKUP_CACHE_ROWS)).longValue());
        }
        refreshDynamicPartition(false);
        this.lookupTable.open();
    }

    @Nullable
    private Predicate createProjectedPredicate(int[] iArr) {
        Predicate predicate = null;
        if (this.predicate != null) {
            predicate = PredicateBuilder.transformFieldMapping(this.predicate, IntStream.range(0, this.table.rowType().getFieldCount()).map(i -> {
                return Ints.indexOf(iArr, i);
            }).toArray()).orElse(null);
        }
        return predicate;
    }

    public Collection<RowData> lookup(RowData rowData) {
        try {
            checkRefresh();
            InternalRow flinkRowWrapper = new FlinkRowWrapper(rowData);
            if (this.partitionLoader != null) {
                BinaryRow refreshDynamicPartition = refreshDynamicPartition(true);
                if (refreshDynamicPartition == null) {
                    return Collections.emptyList();
                }
                flinkRowWrapper = JoinedRow.join(flinkRowWrapper, refreshDynamicPartition);
            }
            List<InternalRow> list = this.lookupTable.get(flinkRowWrapper);
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<InternalRow> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(new FlinkRowData(it.next()));
            }
            return arrayList;
        } catch (OutOfRangeException e) {
            reopen();
            return lookup(rowData);
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Nullable
    private BinaryRow refreshDynamicPartition(boolean z) throws Exception {
        if (this.partitionLoader == null) {
            return null;
        }
        boolean checkRefresh = this.partitionLoader.checkRefresh();
        BinaryRow partition = this.partitionLoader.partition();
        if (partition == null) {
            return null;
        }
        this.lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
        if (checkRefresh && z) {
            this.lookupTable.close();
            this.lookupTable.open();
        }
        return partition;
    }

    private Predicate createSpecificPartFilter(BinaryRow binaryRow) {
        RowType rowType = this.table.rowType();
        List<String> partitionKeys = this.table.partitionKeys();
        Object[] convert = new RowDataToObjectArrayConverter(rowType.project(partitionKeys)).convert(binaryRow);
        HashMap hashMap = new HashMap(convert.length);
        for (int i = 0; i < convert.length; i++) {
            hashMap.put(partitionKeys.get(i), convert[i]);
        }
        return PartitionPredicate.createPartitionPredicate(rowType, hashMap);
    }

    private void reopen() {
        try {
            close();
            open();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void checkRefresh() throws Exception {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0) {
            LOG.info("Lookup table {} has refreshed after {} second(s), refreshing", this.table.name(), Long.valueOf(this.refreshInterval.toMillis() / 1000));
        }
        refresh();
        this.nextLoadTime = System.currentTimeMillis() + this.refreshInterval.toMillis();
    }

    @VisibleForTesting
    LookupTable lookupTable() {
        return this.lookupTable;
    }

    private void refresh() throws Exception {
        this.lookupTable.refresh();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.lookupTable != null) {
            this.lookupTable.close();
            this.lookupTable = null;
        }
        if (this.path != null) {
            FileIOUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private static String getTmpDirectory(FunctionContext functionContext) {
        try {
            Field declaredField = functionContext.getClass().getDeclaredField("context");
            declaredField.setAccessible(true);
            String[] tmpDirectories = extractStreamingRuntimeContext(declaredField.get(functionContext)).getTaskManagerRuntimeInfo().getTmpDirectories();
            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }

    private static StreamingRuntimeContext extractStreamingRuntimeContext(Object obj) throws NoSuchFieldException, IllegalAccessException {
        if (obj instanceof StreamingRuntimeContext) {
            return (StreamingRuntimeContext) obj;
        }
        Field declaredField = obj.getClass().getDeclaredField("runtimeContext");
        declaredField.setAccessible(true);
        return extractStreamingRuntimeContext(declaredField.get(obj));
    }
}
