/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.hbase2.source;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableName;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Get;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Result;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.util.Threads;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class HBaseRowDataAsyncLookupFunction
extends AsyncLookupFunction {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
    private static final long serialVersionUID = 1L;
    private final String hTableName;
    private final byte[] serializedConfig;
    private final HBaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;
    private transient AsyncConnection asyncConnection;
    private transient AsyncTable<ScanResultConsumer> table;
    private transient HBaseSerde serde;
    private final int maxRetryTimes;
    private static final int THREAD_POOL_SIZE = 16;

    public HBaseRowDataAsyncLookupFunction(Configuration configuration, String hTableName, HBaseTableSchema hbaseTableSchema, String nullStringLiteral, int maxRetryTimes) {
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = hTableName;
        this.hbaseTableSchema = hbaseTableSchema;
        this.nullStringLiteral = nullStringLiteral;
        this.maxRetryTimes = maxRetryTimes;
    }

    public void open(FunctionContext context) {
        LOG.info("start open ...");
        ExecutorService threadPool = Executors.newFixedThreadPool(16, (ThreadFactory)new ExecutorThreadFactory("hbase-async-lookup-worker", Threads.LOGGING_EXCEPTION_HANDLER));
        Configuration config = this.prepareRuntimeConfiguration();
        CompletableFuture<AsyncConnection> asyncConnectionFuture = ConnectionFactory.createAsyncConnection(config);
        try {
            this.asyncConnection = asyncConnectionFuture.get();
            this.table = this.asyncConnection.getTable(TableName.valueOf(this.hTableName), threadPool);
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Exception while creating connection to HBase.", (Throwable)e);
            throw new RuntimeException("Cannot create connection to HBase.", e);
        }
        this.serde = new HBaseSerde(this.hbaseTableSchema, this.nullStringLiteral);
        LOG.info("end open.");
    }

    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
        int currentRetry = 0;
        CompletableFuture<Collection<RowData>> future = new CompletableFuture<Collection<RowData>>();
        this.fetchResult(future, currentRetry, ((GenericRowData)keyRow).getField(0));
        return future;
    }

    private void fetchResult(CompletableFuture<Collection<RowData>> resultFuture, int currentRetry, Object rowKey) {
        Get get = this.serde.createGet(rowKey);
        CompletableFuture<Result> responseFuture = this.table.get(get);
        responseFuture.whenCompleteAsync((result, throwable) -> {
            if (throwable != null) {
                if (throwable instanceof TableNotFoundException) {
                    LOG.error("Table '{}' not found ", (Object)this.hTableName, throwable);
                    resultFuture.completeExceptionally(new RuntimeException("HBase table '" + this.hTableName + "' not found.", (Throwable)throwable));
                } else {
                    LOG.error(String.format("HBase asyncLookup error, retry times = %d", currentRetry), throwable);
                    if (currentRetry >= this.maxRetryTimes) {
                        resultFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        try {
                            Thread.sleep(1000 * currentRetry);
                        }
                        catch (InterruptedException e1) {
                            resultFuture.completeExceptionally(e1);
                        }
                        this.fetchResult(resultFuture, currentRetry + 1, rowKey);
                    }
                }
            } else if (result.isEmpty()) {
                resultFuture.complete(Collections.emptyList());
            } else {
                resultFuture.complete(Collections.singletonList(this.serde.convertToNewRow((Result)result)));
            }
        });
    }

    private Configuration prepareRuntimeConfiguration() {
        Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
        if (StringUtils.isNullOrWhitespaceOnly((String)runtimeConfig.get("hbase.zookeeper.quorum"))) {
            LOG.error("can not connect to HBase without {} configuration", (Object)"hbase.zookeeper.quorum");
            throw new IllegalArgumentException("check HBase configuration failed, lost: 'hbase.zookeeper.quorum'!");
        }
        return runtimeConfig;
    }

    public void close() {
        LOG.info("start close ...");
        if (null != this.table) {
            this.table = null;
        }
        if (null != this.asyncConnection) {
            try {
                this.asyncConnection.close();
                this.asyncConnection = null;
            }
            catch (IOException e) {
                LOG.warn("exception when close connection", (Throwable)e);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    public String getHTableName() {
        return this.hTableName;
    }
}

