/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.hbase.source;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableName;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Connection;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Get;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.HTable;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Result;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class HBaseRowDataLookupFunction
extends TableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
    private static final long serialVersionUID = 1L;
    private final String hTableName;
    private final byte[] serializedConfig;
    private final HBaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;
    private transient Connection hConnection;
    private transient HTable table;
    private transient HBaseSerde serde;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private transient Cache<Object, RowData> cache;

    public HBaseRowDataLookupFunction(Configuration configuration, String hTableName, HBaseTableSchema hbaseTableSchema, String nullStringLiteral, HBaseLookupOptions lookupOptions) {
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = hTableName;
        this.hbaseTableSchema = hbaseTableSchema;
        this.nullStringLiteral = nullStringLiteral;
        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
    }

    public void eval(Object rowKey) throws IOException {
        RowData cacheRowData;
        if (this.cache != null && (cacheRowData = (RowData)this.cache.getIfPresent(rowKey)) != null) {
            this.collect(cacheRowData);
            return;
        }
        for (int retry = 0; retry <= this.maxRetryTimes; ++retry) {
            try {
                Result result;
                Get get = this.serde.createGet(rowKey);
                if (get == null || (result = this.table.get(get)).isEmpty()) break;
                if (this.cache != null) {
                    RowData rowData = this.serde.convertToNewRow(result);
                    this.collect(rowData);
                    this.cache.put(rowKey, (Object)rowData);
                    break;
                }
                this.collect(this.serde.convertToReusedRow(result));
                break;
            }
            catch (IOException e) {
                LOG.error(String.format("HBase lookup error, retry times = %d", retry), (Throwable)e);
                if (retry >= this.maxRetryTimes) {
                    throw new RuntimeException("Execution of HBase lookup failed.", e);
                }
                try {
                    Thread.sleep(1000 * retry);
                    continue;
                }
                catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
    }

    private Configuration prepareRuntimeConfiguration() {
        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
        if (StringUtils.isNullOrWhitespaceOnly((String)runtimeConfig.get("hbase.zookeeper.quorum"))) {
            LOG.error("can not connect to HBase without {} configuration", (Object)"hbase.zookeeper.quorum");
            throw new IllegalArgumentException("check HBase configuration failed, lost: 'hbase.zookeeper.quorum'!");
        }
        return runtimeConfig;
    }

    public void open(FunctionContext context) {
        LOG.info("start open ...");
        Configuration config = this.prepareRuntimeConfiguration();
        try {
            this.hConnection = ConnectionFactory.createConnection(config);
            this.table = (HTable)this.hConnection.getTable(TableName.valueOf(this.hTableName));
            Cache<Object, RowData> cache = this.cache = this.cacheMaxSize <= 0L || this.cacheExpireMs <= 0L ? null : CacheBuilder.newBuilder().recordStats().expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(this.cacheMaxSize).build();
            if (this.cache != null) {
                context.getMetricGroup().gauge("lookupCacheHitRate", () -> this.cache.stats().hitRate());
            }
        }
        catch (TableNotFoundException tnfe) {
            LOG.error("Table '{}' not found ", (Object)this.hTableName, (Object)tnfe);
            throw new RuntimeException("HBase table '" + this.hTableName + "' not found.", tnfe);
        }
        catch (IOException ioe) {
            LOG.error("Exception while creating connection to HBase.", (Throwable)ioe);
            throw new RuntimeException("Cannot create connection to HBase.", ioe);
        }
        this.serde = new HBaseSerde(this.hbaseTableSchema, this.nullStringLiteral);
        LOG.info("end open.");
    }

    public void close() {
        LOG.info("start close ...");
        if (null != this.table) {
            try {
                this.table.close();
                this.table = null;
            }
            catch (IOException e) {
                LOG.warn("exception when close table", (Throwable)e);
            }
        }
        if (null != this.hConnection) {
            try {
                this.hConnection.close();
                this.hConnection = null;
            }
            catch (IOException e) {
                LOG.warn("exception when close connection", (Throwable)e);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    public String getHTableName() {
        return this.hTableName;
    }
}

