package co.cask.cdap.messaging;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.CConfigurationReader;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HTableNameConverter;
import co.cask.cdap.data2.util.hbase.ScanBuilder;
import co.cask.cdap.messaging.MessagingUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

/* loaded from: input_file:co/cask/cdap/messaging/TopicMetadataCache.class */
public class TopicMetadataCache extends AbstractIdleService {
    private static final Log LOG = LogFactory.getLog(TopicMetadataCache.class);
    private static final Gson GSON = new Gson();
    private static final Type MAP_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.messaging.TopicMetadataCache.1
    }.getType();
    private static final byte[] COL_FAMILY = MessagingUtils.Constants.COLUMN_FAMILY;
    private static final byte[] COL = MessagingUtils.Constants.METADATA_COLUMN;
    private final RegionCoprocessorEnvironment env;
    private final CConfigurationReader cConfReader;
    private final String hbaseNamespacePrefix;
    private final String metadataTableNamespace;
    private final ScanBuilder scanBuilder;
    private volatile Thread refreshThread;
    private volatile boolean stopped;
    private volatile CConfiguration cConf;
    private long lastUpdated;
    private volatile Map<ByteBuffer, Map<String, String>> metadataCache = new HashMap();
    private long metadataCacheUpdateFreqInMillis = TimeUnit.SECONDS.toMillis(MessagingUtils.Constants.METADATA_CACHE_UPDATE_FREQUENCY_SECS);

    public TopicMetadataCache(RegionCoprocessorEnvironment regionCoprocessorEnvironment, CConfigurationReader cConfigurationReader, String str, String str2, ScanBuilder scanBuilder) {
        this.env = regionCoprocessorEnvironment;
        this.cConfReader = cConfigurationReader;
        this.hbaseNamespacePrefix = str;
        this.metadataTableNamespace = str2;
        this.scanBuilder = scanBuilder;
    }

    public boolean isAlive() {
        return this.refreshThread.isAlive();
    }

    protected void startUp() throws Exception {
        LOG.info("Starting TopicMetadataCache Refresh Thread.");
        startRefreshThread();
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping TopicMetadataCache Refresh Thread.");
        this.stopped = true;
        if (this.refreshThread != null) {
            this.refreshThread.interrupt();
            this.refreshThread.join(TimeUnit.SECONDS.toMillis(1L));
        }
    }

    @Nullable
    public Map<String, String> getTopicMetadata(ByteBuffer byteBuffer) {
        return this.metadataCache.get(byteBuffer);
    }

    @Nullable
    public CConfiguration getCConfiguration() {
        return this.cConf;
    }

    @VisibleForTesting
    public synchronized void updateCache() throws IOException {
        HTableInterface hTableInterface = null;
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        try {
            CConfiguration read = this.cConfReader.read();
            if (read != null) {
                this.cConf = read;
                int i = read.getInt("messaging.hbase.scan.cache.rows");
                this.metadataCacheUpdateFreqInMillis = TimeUnit.SECONDS.toMillis(read.getLong("messaging.coprocessor.metadata.cache.update.frequency.seconds", MessagingUtils.Constants.METADATA_CACHE_UPDATE_FREQUENCY_SECS));
                String str = read.get("messaging.metadata.table.name");
                hTableInterface = getMetadataTable(str);
                if (hTableInterface == null) {
                    LOG.warn(String.format("Could not find HTableInterface of metadataTable %s:%s. Cannot update metadata cache", this.hbaseNamespacePrefix, str));
                    if (hTableInterface != null) {
                        try {
                            hTableInterface.close();
                            return;
                        } catch (IOException e) {
                            LOG.error("Error closing table. ", e);
                            return;
                        }
                    }
                    return;
                }
                HashMap hashMap = new HashMap();
                ResultScanner<Result> scanner = hTableInterface.getScanner(this.scanBuilder.setCaching(i).build());
                Throwable th = null;
                try {
                    try {
                        for (Result result : scanner) {
                            ByteBuffer wrap = ByteBuffer.wrap(result.getRow());
                            Map map = (Map) GSON.fromJson(Bytes.toString(result.getValue(COL_FAMILY, COL)), MAP_TYPE);
                            map.put(MessagingUtils.Constants.TTL_KEY, Long.toString(TimeUnit.SECONDS.toMillis(Long.parseLong((String) map.get(MessagingUtils.Constants.TTL_KEY)))));
                            hashMap.put(wrap, map);
                            j++;
                        }
                        if (scanner != null) {
                            if (0 != 0) {
                                try {
                                    scanner.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                scanner.close();
                            }
                        }
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        this.metadataCache = hashMap;
                        this.lastUpdated = currentTimeMillis;
                        LOG.debug(String.format("Updated consumer config cache with %d topics, took %d msec", Long.valueOf(j), Long.valueOf(currentTimeMillis2)));
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            }
            if (hTableInterface != null) {
                try {
                    hTableInterface.close();
                } catch (IOException e2) {
                    LOG.error("Error closing table. ", e2);
                }
            }
        } catch (Throwable th4) {
            if (hTableInterface != null) {
                try {
                    hTableInterface.close();
                } catch (IOException e3) {
                    LOG.error("Error closing table. ", e3);
                }
            }
            throw th4;
        }
    }

    private HTableInterface getMetadataTable(String str) throws IOException {
        return this.env.getTable(HTableNameConverter.toTableName(this.hbaseNamespacePrefix, TableId.from(this.metadataTableNamespace, str)));
    }

    private void startRefreshThread() {
        this.refreshThread = new Thread("tms-topic-metadata-cache-refresh") { // from class: co.cask.cdap.messaging.TopicMetadataCache.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!isInterrupted() && !TopicMetadataCache.this.stopped) {
                    if (System.currentTimeMillis() > TopicMetadataCache.this.lastUpdated + TopicMetadataCache.this.metadataCacheUpdateFreqInMillis) {
                        try {
                            TopicMetadataCache.this.updateCache();
                        } catch (IOException e) {
                            TopicMetadataCache.LOG.warn("Error updating metadata table cache", e);
                        } catch (TableNotFoundException e2) {
                            TopicMetadataCache.LOG.warn("Metadata table not found.", e2);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e3) {
                        interrupt();
                    }
                }
                TopicMetadataCache.LOG.info("Metadata cache update terminated.");
            }
        };
        this.refreshThread.setDaemon(true);
        this.refreshThread.start();
    }
}
