package org.apache.flink.connector.hbase2.source;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.hadoop.hbase.util.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.class */
public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
    private static final long serialVersionUID = 1;
    private final String hTableName;
    private final byte[] serializedConfig;
    private final HBaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;
    private transient AsyncConnection asyncConnection;
    private transient AsyncTable<ScanResultConsumer> table;
    private transient HBaseSerde serde;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private transient Cache<Object, RowData> cache;
    private static final int THREAD_POOL_SIZE = 16;

    public HBaseRowDataAsyncLookupFunction(Configuration configuration, String str, HBaseTableSchema hBaseTableSchema, String str2, HBaseLookupOptions hBaseLookupOptions) {
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = str;
        this.hbaseTableSchema = hBaseTableSchema;
        this.nullStringLiteral = str2;
        this.cacheMaxSize = hBaseLookupOptions.getCacheMaxSize();
        this.cacheExpireMs = hBaseLookupOptions.getCacheExpireMs();
        this.maxRetryTimes = hBaseLookupOptions.getMaxRetryTimes();
    }

    public void open(FunctionContext functionContext) {
        LOG.info("start open ...");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE, new ExecutorThreadFactory("hbase-async-lookup-worker", Threads.LOGGING_EXCEPTION_HANDLER));
        try {
            this.asyncConnection = (AsyncConnection) ConnectionFactory.createAsyncConnection(prepareRuntimeConfiguration()).get();
            this.table = this.asyncConnection.getTable(TableName.valueOf(this.hTableName), newFixedThreadPool);
            this.cache = (this.cacheMaxSize <= 0 || this.cacheExpireMs <= 0) ? null : CacheBuilder.newBuilder().recordStats().expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(this.cacheMaxSize).build();
            if (this.cache != null && functionContext != null) {
                functionContext.getMetricGroup().gauge("lookupCacheHitRate", () -> {
                    return Double.valueOf(this.cache.stats().hitRate());
                });
            }
            this.serde = new HBaseSerde(this.hbaseTableSchema, this.nullStringLiteral);
            LOG.info("end open.");
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Exception while creating connection to HBase.", e);
            throw new RuntimeException("Cannot create connection to HBase.", e);
        }
    }

    public void eval(CompletableFuture<Collection<RowData>> completableFuture, Object obj) {
        RowData rowData;
        if (this.cache == null || (rowData = (RowData) this.cache.getIfPresent(obj)) == null) {
            fetchResult(completableFuture, 0, obj);
        } else if (rowData.getArity() == 0) {
            completableFuture.complete(Collections.emptyList());
        } else {
            completableFuture.complete(Collections.singletonList(rowData));
        }
    }

    private void fetchResult(CompletableFuture<Collection<RowData>> completableFuture, int i, Object obj) {
        this.table.get(this.serde.createGet(obj)).whenCompleteAsync((result, th) -> {
            if (th != null) {
                if (th instanceof TableNotFoundException) {
                    LOG.error("Table '{}' not found ", this.hTableName, th);
                    completableFuture.completeExceptionally(new RuntimeException("HBase table '" + this.hTableName + "' not found.", th));
                    return;
                }
                LOG.error(String.format("HBase asyncLookup error, retry times = %d", Integer.valueOf(i)), th);
                if (i >= this.maxRetryTimes) {
                    completableFuture.completeExceptionally(th);
                    return;
                }
                try {
                    Thread.sleep(1000 * i);
                } catch (InterruptedException e) {
                    completableFuture.completeExceptionally(e);
                }
                fetchResult(completableFuture, i + 1, obj);
                return;
            }
            if (result.isEmpty()) {
                completableFuture.complete(Collections.emptyList());
                if (this.cache != null) {
                    this.cache.put(obj, new GenericRowData(0));
                    return;
                }
                return;
            }
            if (this.cache == null) {
                completableFuture.complete(Collections.singletonList(this.serde.convertToNewRow(result)));
                return;
            }
            RowData convertToNewRow = this.serde.convertToNewRow(result);
            completableFuture.complete(Collections.singletonList(convertToNewRow));
            this.cache.put(obj, convertToNewRow);
        });
    }

    private Configuration prepareRuntimeConfiguration() {
        Configuration deserializeConfiguration = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
        if (!StringUtils.isNullOrWhitespaceOnly(deserializeConfiguration.get("hbase.zookeeper.quorum"))) {
            return deserializeConfiguration;
        }
        LOG.error("can not connect to HBase without {} configuration", "hbase.zookeeper.quorum");
        throw new IllegalArgumentException("check HBase configuration failed, lost: 'hbase.zookeeper.quorum'!");
    }

    public void close() {
        LOG.info("start close ...");
        if (null != this.table) {
            this.table = null;
        }
        if (null != this.asyncConnection) {
            try {
                this.asyncConnection.close();
                this.asyncConnection = null;
            } catch (IOException e) {
                LOG.warn("exception when close connection", e);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    public String getHTableName() {
        return this.hTableName;
    }
}
