package org.apache.flink.connectors.kudu.table.function.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.class */
public class KuduRowDataLookupFunction extends TableFunction<RowData> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(KuduRowDataLookupFunction.class);
    private final KuduTableInfo tableInfo;
    private final KuduReaderConfig kuduReaderConfig;
    private final String[] keyNames;
    private final String[] projectedFields;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private final RowResultConvertor<RowData> convertor;
    private transient Cache<RowData, List<RowData>> cache;
    private transient KuduReader<RowData> kuduReader;

    /* loaded from: input_file:org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction$Builder.class */
    public static class Builder {
        private KuduTableInfo tableInfo;
        private KuduReaderConfig kuduReaderConfig;
        private String[] keyNames;
        private String[] projectedFields;
        private KuduLookupOptions kuduLookupOptions;

        public static Builder options() {
            return new Builder();
        }

        public Builder tableInfo(KuduTableInfo kuduTableInfo) {
            this.tableInfo = kuduTableInfo;
            return this;
        }

        public Builder kuduReaderConfig(KuduReaderConfig kuduReaderConfig) {
            this.kuduReaderConfig = kuduReaderConfig;
            return this;
        }

        public Builder keyNames(String[] strArr) {
            this.keyNames = strArr;
            return this;
        }

        public Builder projectedFields(String[] strArr) {
            this.projectedFields = strArr;
            return this;
        }

        public Builder kuduLookupOptions(KuduLookupOptions kuduLookupOptions) {
            this.kuduLookupOptions = kuduLookupOptions;
            return this;
        }

        public KuduRowDataLookupFunction build() {
            return new KuduRowDataLookupFunction(this.keyNames, this.tableInfo, this.kuduReaderConfig, this.projectedFields, this.kuduLookupOptions);
        }
    }

    private KuduRowDataLookupFunction(String[] strArr, KuduTableInfo kuduTableInfo, KuduReaderConfig kuduReaderConfig, String[] strArr2, KuduLookupOptions kuduLookupOptions) {
        this.tableInfo = kuduTableInfo;
        this.convertor = new RowResultRowDataConvertor();
        this.projectedFields = strArr2;
        this.keyNames = strArr;
        this.kuduReaderConfig = kuduReaderConfig;
        this.cacheMaxSize = kuduLookupOptions.getCacheMaxSize();
        this.cacheExpireMs = kuduLookupOptions.getCacheExpireMs();
        this.maxRetryTimes = kuduLookupOptions.getMaxRetryTimes();
    }

    public RowData buildCacheKey(Object... objArr) {
        return GenericRowData.of(objArr);
    }

    public void eval(Object... objArr) {
        if (objArr.length != this.keyNames.length) {
            throw new RuntimeException("The join keys are of unequal lengths");
        }
        RowData buildCacheKey = buildCacheKey(objArr);
        if (this.cache != null) {
            List list = (List) this.cache.getIfPresent(buildCacheKey);
            if (CollectionUtils.isNotEmpty(list)) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    collect((RowData) it.next());
                }
                return;
            }
        }
        for (int i = 1; i <= this.maxRetryTimes; i++) {
            try {
                this.kuduReader.setTableFilters(buildKuduFilterInfo(objArr));
                KuduInputSplit[] createInputSplits = this.kuduReader.createInputSplits(1);
                ArrayList arrayList = new ArrayList();
                for (KuduInputSplit kuduInputSplit : createInputSplits) {
                    KuduReaderIterator<RowData> scanner = this.kuduReader.scanner(kuduInputSplit.getScanToken());
                    if (this.cache == null) {
                        while (scanner.hasNext()) {
                            collect(scanner.next());
                        }
                    } else {
                        while (scanner.hasNext()) {
                            RowData next = scanner.next();
                            arrayList.add(next);
                            collect(next);
                        }
                        arrayList.trimToSize();
                    }
                }
                if (this.cache != null) {
                    this.cache.put(buildCacheKey, arrayList);
                }
                return;
            } catch (Exception e) {
                LOG.error(String.format("Kudu scan error, retry times = %d", Integer.valueOf(i)), e);
                if (i >= this.maxRetryTimes) {
                    throw new RuntimeException("Execution of Kudu scan failed.", e);
                }
                try {
                    Thread.sleep(1000 * i);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
    }

    private List<KuduFilterInfo> buildKuduFilterInfo(Object... objArr) {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < this.keyNames.length; i++) {
            newArrayList.add(KuduFilterInfo.Builder.create(this.keyNames[i]).equalTo(objArr[i]).build());
        }
        return newArrayList;
    }

    public void open(FunctionContext functionContext) {
        try {
            super.open(functionContext);
            this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
            this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(this.projectedFields) ? Arrays.asList(this.projectedFields) : null);
            this.cache = (this.cacheMaxSize == -1 || this.cacheExpireMs == -1) ? null : CacheBuilder.newBuilder().expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(this.cacheMaxSize).build();
        } catch (Exception e) {
            LOG.error("Exception while creating connection to Kudu.", e);
            throw new RuntimeException("Cannot create connection to Kudu.", e);
        }
    }

    public void close() {
        if (null != this.kuduReader) {
            try {
                this.kuduReader.close();
                this.cache.cleanUp();
                this.cache = null;
                this.kuduReader = null;
            } catch (IOException e) {
                LOG.warn("exception when close table", e);
            }
        }
    }
}
