package org.apache.kylin.stream.core.dict;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-stream-core-3.0.1.jar:org/apache/kylin/stream/core/dict/RemoteDictionaryStore.class */
public class RemoteDictionaryStore {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) RemoteDictionaryStore.class);
    private final byte[] hbaseTableName;
    private final String tableName;
    private Table table;
    private final byte[] encodeQualifierName = "encode_value".getBytes(StandardCharsets.UTF_8);
    private final byte[] tsQualifierName = "ts".getBytes(StandardCharsets.UTF_8);
    private boolean printValue = KylinConfig.getInstanceFromEnv().isPrintRealtimeDictEnabled();

    public RemoteDictionaryStore(String str) {
        this.hbaseTableName = str.getBytes(StandardCharsets.UTF_8);
        this.tableName = str;
    }

    public void init(String[] strArr) throws IOException {
        logger.debug("Checking streaming remote store for {} at {}.", this.tableName, String.join(", ", strArr));
        Connection connection = getConnection();
        Admin admin = connection.getAdmin();
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(this.hbaseTableName));
        for (String str : strArr) {
            hTableDescriptor.addFamily(new HColumnDescriptor(str));
        }
        DistributedLock lockForCurrentProcess = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentProcess();
        try {
            if (!lockForCurrentProcess.lock(lockPath()) || admin.tableExists(TableName.valueOf(this.hbaseTableName))) {
                logger.info("Table exists or cannot fetch lock {}", hTableDescriptor);
            } else {
                logger.info("Create htable with {}.", hTableDescriptor);
                admin.createTable(hTableDescriptor);
            }
            this.table = connection.getTable(TableName.valueOf(this.hbaseTableName));
        } finally {
            admin.close();
            if (lockForCurrentProcess != null && lockForCurrentProcess.isLockedByMe(lockPath())) {
                lockForCurrentProcess.unlock(lockPath());
            }
        }
    }

    public int checkAndPutWithRetry(ByteArray byteArray, String str, int i, int i2, boolean z) {
        IOException iOException;
        int i3 = 0;
        int i4 = -1;
        do {
            try {
                i4 = checkAndPut(byteArray, str, i, i2, z);
                iOException = null;
            } catch (IOException e) {
                logger.error("CheckAndPut failed at " + str + ", columnFamily " + new String(byteArray.array(), StandardCharsets.UTF_8), (Throwable) e);
                iOException = e;
                i3++;
                try {
                    long j = 1000 * (i3 <= 10 ? i3 : 10);
                    logger.debug("Sleep to wait set succeed for {} ms.", Long.valueOf(j));
                    Thread.sleep(j);
                } catch (InterruptedException e2) {
                }
            }
        } while (iOException != null);
        return i4;
    }

    int checkAndPut(ByteArray byteArray, String str, int i, int i2, boolean z) throws IOException {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        if (bytes.length == 0) {
            return 0;
        }
        byte[] bytes2 = Integer.toString(i2).getBytes(StandardCharsets.UTF_8);
        Put put = new Put(bytes);
        put.addColumn(byteArray.array(), this.encodeQualifierName, bytes2);
        put.addColumn(byteArray.array(), this.tsQualifierName, Bytes.toBytes(System.currentTimeMillis()));
        if (!this.table.checkAndPut(bytes, byteArray.array(), this.encodeQualifierName, z ? Integer.toString(i).getBytes(StandardCharsets.UTF_8) : null, put)) {
            return -2;
        }
        if (this.printValue) {
            logger.debug("Encode {} to {}", str, Integer.valueOf(i2));
        }
        return i2;
    }

    public int encodeWithRetry(ByteArray byteArray, String str) {
        IOException iOException;
        int i = 0;
        int i2 = -2;
        do {
            try {
                i2 = encode(byteArray, str);
                iOException = null;
            } catch (IOException e) {
                logger.error("Encode failed at " + str + ", column " + new String(byteArray.array(), StandardCharsets.UTF_8), (Throwable) e);
                iOException = e;
                i++;
                try {
                    long j = 1000 * (i <= 10 ? i : 10);
                    logger.debug("Sleep to wait set succeed for {} ms.", Long.valueOf(j));
                    Thread.sleep(j);
                } catch (InterruptedException e2) {
                }
            }
        } while (iOException != null);
        return i2;
    }

    int encode(ByteArray byteArray, String str) throws IOException {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        if (bytes.length == 0) {
            return 0;
        }
        Result result = this.table.get(new Get(bytes));
        byte[] value = result.getValue(byteArray.array(), this.encodeQualifierName);
        byte[] value2 = result.getValue(byteArray.array(), this.tsQualifierName);
        String str2 = new String(value, StandardCharsets.UTF_8);
        String str3 = new String(value2, StandardCharsets.UTF_8);
        if (this.printValue) {
            logger.debug("Encode {} to {} [{}]", str, str2, str3);
        }
        return Integer.parseInt(str2);
    }

    static Connection getConnection() {
        try {
            return ConnectionFactory.createConnection(HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()));
        } catch (IOException e) {
            throw new IllegalStateException("Cannot connect to HBase.", e);
        }
    }

    private String lockPath() {
        return "/realtime/create_global_dict_table/" + this.tableName;
    }
}
