package co.cask.cdap.data2.transaction.queue.hbase.coprocessor;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.util.hbase.CConfigurationReader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.AbstractIdleService;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.persist.TransactionVisibilityState;
import org.apache.tephra.util.TxUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/coprocessor/ConsumerConfigCache.class */
public class ConsumerConfigCache extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfigCache.class);
    private static final int STATE_COLUMN_SIZE = 12;
    private static final long CONFIG_UPDATE_FREQUENCY = 300000;
    private final TableName queueConfigTableName;
    private final CConfigurationReader cConfReader;
    private final Supplier<TransactionVisibilityState> transactionSnapshotSupplier;
    private final InputSupplier<HTableInterface> hTableSupplier;
    private volatile Thread refreshThread;
    private volatile boolean stopped;
    private volatile CConfiguration conf;
    private long lastUpdated;
    private long lastConfigUpdate;
    private volatile Map<byte[], QueueConsumerConfig> configCache = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
    private long configCacheUpdateFrequency = QueueConstants.DEFAULT_QUEUE_CONFIG_UPDATE_FREQUENCY.longValue();
    private final TransactionCodec txCodec = new TransactionCodec();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerConfigCache(TableName tableName, CConfigurationReader cConfigurationReader, Supplier<TransactionVisibilityState> supplier, InputSupplier<HTableInterface> inputSupplier) {
        this.queueConfigTableName = tableName;
        this.cConfReader = cConfigurationReader;
        this.transactionSnapshotSupplier = supplier;
        this.hTableSupplier = inputSupplier;
    }

    protected void startUp() throws Exception {
        LOG.info("Starting ConsumerConfigCache Refresh Thread for Table : {}", this.queueConfigTableName);
        startRefreshThread();
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping ConsumerConfigCache Refresh Thread for Table : {}", this.queueConfigTableName);
        this.stopped = true;
        if (this.refreshThread != null) {
            this.refreshThread.interrupt();
            this.refreshThread.join(TimeUnit.SECONDS.toMillis(1L));
        }
    }

    public boolean isAlive() {
        return this.refreshThread.isAlive();
    }

    @Nullable
    public CConfiguration getCConf() {
        if (this.conf != null) {
            return CConfiguration.copy(this.conf);
        }
        return null;
    }

    @Nullable
    public QueueConsumerConfig getConsumerConfig(byte[] bArr) {
        return this.configCache.get(bArr);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateConfig() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.conf == null || currentTimeMillis > this.lastConfigUpdate + CONFIG_UPDATE_FREQUENCY) {
            try {
                CConfiguration read = this.cConfReader.read();
                if (read != null) {
                    this.conf = read;
                    LOG.info("Reloaded CConfiguration at {}", Long.valueOf(currentTimeMillis));
                    this.lastConfigUpdate = currentTimeMillis;
                    long j = read.getLong(QueueConstants.QUEUE_CONFIG_UPDATE_FREQUENCY, QueueConstants.DEFAULT_QUEUE_CONFIG_UPDATE_FREQUENCY.longValue());
                    LOG.info("Will reload consumer config cache every {} seconds", Long.valueOf(j));
                    this.configCacheUpdateFrequency = j * 1000;
                }
            } catch (IOException e) {
                LOG.error("Error reading default configuration table", e);
            }
        }
    }

    @VisibleForTesting
    public synchronized void updateCache() throws IOException {
        NavigableMap familyMap;
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        long currentTimeMillis = System.currentTimeMillis();
        TransactionVisibilityState transactionVisibilityState = (TransactionVisibilityState) this.transactionSnapshotSupplier.get();
        if (transactionVisibilityState == null) {
            LOG.debug("No transaction snapshot is available. Not updating the consumer config cache.");
            return;
        }
        HTableInterface hTableInterface = (HTableInterface) this.hTableSupplier.getInput();
        try {
            Scan scan = new Scan();
            scan.addFamily(QueueEntryRow.COLUMN_FAMILY);
            setScanAttribute(scan, "tephra.tx", this.txCodec.encode(TxUtils.createDummyTransaction(transactionVisibilityState)));
            int i = 0;
            for (Result result : hTableInterface.getScanner(scan)) {
                if (!result.isEmpty() && (familyMap = result.getFamilyMap(QueueEntryRow.COLUMN_FAMILY)) != null) {
                    i++;
                    HashMap hashMap = new HashMap();
                    int i2 = 0;
                    Long l = null;
                    for (Map.Entry entry : familyMap.entrySet()) {
                        if (((byte[]) entry.getKey()).length == 12) {
                            long j = Bytes.toLong((byte[]) entry.getKey());
                            hashMap.put(new ConsumerInstance(j, Bytes.toInt((byte[]) entry.getKey(), 8)), entry.getValue());
                            if (l == null || l.longValue() != j) {
                                i2++;
                                l = Long.valueOf(j);
                            }
                        }
                    }
                    newTreeMap.put(result.getRow(), new QueueConsumerConfig(hashMap, i2));
                }
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            this.configCache = newTreeMap;
            this.lastUpdated = currentTimeMillis;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Updated consumer config cache with {} entries, took {} msec", Integer.valueOf(i), Long.valueOf(currentTimeMillis2));
            }
        } finally {
            try {
                hTableInterface.close();
            } catch (IOException e) {
                LOG.error("Error closing table {}", this.queueConfigTableName, e);
            }
        }
    }

    private void startRefreshThread() {
        this.refreshThread = new Thread("queue-cache-refresh") { // from class: co.cask.cdap.data2.transaction.queue.hbase.coprocessor.ConsumerConfigCache.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!isInterrupted() && !ConsumerConfigCache.this.stopped) {
                    ConsumerConfigCache.this.updateConfig();
                    if (System.currentTimeMillis() > ConsumerConfigCache.this.lastUpdated + ConsumerConfigCache.this.configCacheUpdateFrequency) {
                        try {
                            ConsumerConfigCache.this.updateCache();
                        } catch (TableNotFoundException e) {
                            ConsumerConfigCache.LOG.warn("Queue config table not found: {}", ConsumerConfigCache.this.queueConfigTableName, e);
                        } catch (IOException e2) {
                            ConsumerConfigCache.LOG.warn("Error updating queue consumer config cache", e2);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e3) {
                        interrupt();
                    }
                }
                ConsumerConfigCache.LOG.info("Config cache update for {} terminated.", ConsumerConfigCache.this.queueConfigTableName);
            }
        };
        this.refreshThread.setDaemon(true);
        this.refreshThread.start();
    }

    private void setScanAttribute(Scan scan, String str, byte[] bArr) {
        try {
            scan.getClass().getMethod("setAttribute", String.class, byte[].class).invoke(scan, str, bArr);
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            LOG.error("Failed to call Scan.setAttribute", e);
            throw Throwables.propagate(e);
        }
    }
}
