package org.apache.flink.table.store.connector.lookup;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.RocksDBOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.TableStreamingReader;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.class */
public class FileStoreLookupFunction extends TableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
    private final FileStoreTable table;
    private final List<String> projectFields;
    private final List<String> joinKeys;

    @Nullable
    private final Predicate predicate;
    private transient Duration refreshInterval;
    private transient File path;
    private transient RocksDBStateFactory stateFactory;
    private transient LookupTable lookupTable;
    private transient long nextLoadTime;
    private transient TableStreamingReader streamingReader;

    public FileStoreLookupFunction(FileStoreTable fileStoreTable, int[] iArr, int[] iArr2, @Nullable Predicate predicate) {
        TableSchema schema = fileStoreTable.schema();
        Preconditions.checkArgument(schema.partitionKeys().isEmpty(), "Currently only support non-partitioned table.");
        Preconditions.checkArgument(schema.primaryKeys().size() > 0, "Currently only support primary key table.");
        this.table = fileStoreTable;
        this.joinKeys = (List) Arrays.stream(iArr2).mapToObj(i -> {
            return schema.fieldNames().get(iArr[i]);
        }).collect(Collectors.toList());
        this.projectFields = (List) Arrays.stream(iArr).mapToObj(i2 -> {
            return schema.fieldNames().get(i2);
        }).collect(Collectors.toList());
        for (String str : schema.primaryKeys()) {
            if (!this.projectFields.contains(str)) {
                this.projectFields.add(str);
            }
        }
        this.predicate = predicate;
    }

    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        this.path = new File(getTmpDirectory(functionContext), "lookup-" + UUID.randomUUID());
        Configuration fromMap = Configuration.fromMap(this.table.schema().options());
        this.refreshInterval = (Duration) fromMap.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
        this.stateFactory = new RocksDBStateFactory(this.path.toString(), fromMap);
        List fieldNames = this.table.schema().logicalRowType().getFieldNames();
        Stream<String> stream = this.projectFields.stream();
        fieldNames.getClass();
        int[] array = stream.mapToInt((v1) -> {
            return r1.indexOf(v1);
        }).toArray();
        this.lookupTable = LookupTable.create(this.stateFactory, TypeUtils.project(this.table.schema().logicalRowType(), array), this.table.schema().primaryKeys(), this.joinKeys, createRecordFilter(array), fromMap.getLong(RocksDBOptions.LOOKUP_CACHE_ROWS));
        this.nextLoadTime = -1L;
        this.streamingReader = new TableStreamingReader(this.table, array, this.predicate);
        refresh();
    }

    private PredicateFilter createRecordFilter(int[] iArr) {
        Predicate predicate = null;
        if (this.predicate != null) {
            predicate = PredicateBuilder.transformFieldMapping(this.predicate, IntStream.range(0, this.table.schema().fields().size()).map(i -> {
                return Ints.indexOf(iArr, i);
            }).toArray()).orElse(null);
        }
        return new PredicateFilter(TypeUtils.project(this.table.schema().logicalRowType(), iArr), predicate);
    }

    public void eval(Object... objArr) throws IOException {
        checkRefresh();
        Iterator<RowData> it = this.lookupTable.get(GenericRowData.of(objArr)).iterator();
        while (it.hasNext()) {
            collect(it.next());
        }
    }

    private void checkRefresh() throws IOException {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0) {
            LOG.info("Lookup table has refreshed after {} minute(s), refreshing", Long.valueOf(this.refreshInterval.toMinutes()));
        }
        refresh();
        this.nextLoadTime = System.currentTimeMillis() + this.refreshInterval.toMillis();
    }

    private void refresh() throws IOException {
        while (true) {
            Iterator<RowData> nextBatch = this.streamingReader.nextBatch();
            if (nextBatch == null) {
                return;
            } else {
                this.lookupTable.refresh(nextBatch);
            }
        }
    }

    public void close() throws IOException {
        if (this.stateFactory != null) {
            this.stateFactory.close();
            this.stateFactory = null;
        }
        if (this.path != null) {
            FileUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private static String getTmpDirectory(FunctionContext functionContext) {
        try {
            Field declaredField = functionContext.getClass().getDeclaredField("context");
            declaredField.setAccessible(true);
            String[] tmpDirectories = ((StreamingRuntimeContext) declaredField.get(functionContext)).getTaskManagerRuntimeInfo().getTmpDirectories();
            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }
}
