package org.apache.flink.connectors.hive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.connector.file.table.PartitionFetcher;
import org.apache.flink.connector.file.table.PartitionReader;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/hive/FileSystemLookupFunction.class */
public class FileSystemLookupFunction<P> extends TableFunction<RowData> {
    private static final int MAX_RETRIES = 3;
    private final PartitionFetcher<P> partitionFetcher;
    private final PartitionFetcher.Context<P> fetcherContext;
    private final PartitionReader<P, RowData> partitionReader;
    private final RowData.FieldGetter[] lookupFieldGetters;
    private final Duration reloadInterval;
    private final TypeSerializer<RowData> serializer;
    private final RowType rowType;
    private transient Map<RowData, List<RowData>> cache;
    private transient long nextLoadTime;
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);

    public FileSystemLookupFunction(PartitionFetcher<P> partitionFetcher, PartitionFetcher.Context<P> context, PartitionReader<P, RowData> partitionReader, RowType rowType, int[] iArr, Duration duration) {
        this.fetcherContext = context;
        this.partitionFetcher = partitionFetcher;
        this.partitionReader = partitionReader;
        this.rowType = rowType;
        this.lookupFieldGetters = new RowData.FieldGetter[iArr.length];
        for (int i = 0; i < iArr.length; i++) {
            this.lookupFieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(iArr[i]), iArr[i]);
        }
        this.reloadInterval = duration;
        this.serializer = InternalSerializers.create(rowType);
    }

    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        this.cache = new HashMap();
        this.nextLoadTime = -1L;
        this.fetcherContext.open();
    }

    public TypeInformation<RowData> getResultType() {
        return InternalTypeInfo.of(this.rowType);
    }

    public void eval(Object... objArr) {
        checkCacheReload();
        List<RowData> list = this.cache.get(GenericRowData.of(objArr));
        if (list != null) {
            Iterator<RowData> it = list.iterator();
            while (it.hasNext()) {
                collect(it.next());
            }
        }
    }

    private void checkCacheReload() {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0) {
            LOG.info("Lookup join cache has expired after {} minute(s), reloading", Long.valueOf(this.reloadInterval.toMinutes()));
        } else {
            LOG.info("Populating lookup join cache");
        }
        int i = 0;
        while (true) {
            this.cache.clear();
            try {
                long j = 0;
                GenericRowData genericRowData = new GenericRowData(this.rowType.getFieldCount());
                this.partitionReader.open(this.partitionFetcher.fetch(this.fetcherContext));
                while (true) {
                    RowData rowData = (RowData) this.partitionReader.read(genericRowData);
                    if (rowData == null) {
                        this.partitionReader.close();
                        this.nextLoadTime = System.currentTimeMillis() + this.reloadInterval.toMillis();
                        LOG.info("Loaded {} row(s) into lookup join cache", Long.valueOf(j));
                        return;
                    } else {
                        j++;
                        RowData rowData2 = (RowData) this.serializer.copy(rowData);
                        this.cache.computeIfAbsent(extractLookupKey(rowData2), rowData3 -> {
                            return new ArrayList();
                        }).add(rowData2);
                    }
                }
            } catch (Exception e) {
                if (i >= 3) {
                    throw new FlinkRuntimeException(String.format("Failed to load table into cache after %d retries", Integer.valueOf(i)), e);
                }
                i++;
                long millis = i * RETRY_INTERVAL.toMillis();
                LOG.warn(String.format("Failed to load table into cache, will retry in %d seconds", Long.valueOf(millis / 1000)), e);
                try {
                    Thread.sleep(millis);
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted while waiting to retry failed cache load, aborting");
                    throw new FlinkRuntimeException(e2);
                }
            }
        }
    }

    private RowData extractLookupKey(RowData rowData) {
        GenericRowData genericRowData = new GenericRowData(this.lookupFieldGetters.length);
        for (int i = 0; i < this.lookupFieldGetters.length; i++) {
            genericRowData.setField(i, this.lookupFieldGetters[i].getFieldOrNull(rowData));
        }
        return genericRowData;
    }

    public void close() throws Exception {
        this.fetcherContext.close();
    }

    @VisibleForTesting
    public Duration getReloadInterval() {
        return this.reloadInterval;
    }

    @VisibleForTesting
    public PartitionFetcher<P> getPartitionFetcher() {
        return this.partitionFetcher;
    }

    @VisibleForTesting
    public PartitionFetcher.Context<P> getFetcherContext() {
        return this.fetcherContext;
    }

    @VisibleForTesting
    public PartitionReader<P, RowData> getPartitionReader() {
        return this.partitionReader;
    }
}
