package co.cask.cdap.data2.transaction.messaging.coprocessor.hbase10cdh;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.transaction.coprocessor.DefaultTransactionStateCacheSupplier;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.CConfigurationReader;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HTable10CDHNameConverter;
import co.cask.cdap.messaging.MessagingUtils;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.persist.TransactionVisibilityState;

/* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase10cdh/MessageTableRegionObserver.class */
public class MessageTableRegionObserver extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(MessageTableRegionObserver.class);
    private static final Gson GSON = new Gson();
    private static final Type MAP_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.data2.transaction.messaging.coprocessor.hbase10cdh.MessageTableRegionObserver.1
    }.getType();
    private static final byte[] COL_FAMILY = MessagingUtils.Constants.COLUMN_FAMILY;
    private static final byte[] COL = MessagingUtils.Constants.METADATA_COLUMN;
    private int prefixLength;
    private LoadingCache<ByteBuffer, Map<String, String>> topicCache;
    private TransactionStateCache txStateCache;
    private HTableInterface metadataTable;
    private CConfigurationReader cConfReader;

    /* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase10cdh/MessageTableRegionObserver$LoggingInternalScanner.class */
    private static final class LoggingInternalScanner implements InternalScanner {
        private static final Log LOG = LogFactory.getLog(LoggingInternalScanner.class);
        private final String filterName;
        private final String operation;
        private final InternalScanner delegate;
        private final TransactionVisibilityState state;

        LoggingInternalScanner(String str, String str2, InternalScanner internalScanner, @Nullable TransactionVisibilityState transactionVisibilityState) {
            this.filterName = str;
            this.operation = str2;
            this.delegate = internalScanner;
            this.state = transactionVisibilityState;
        }

        public boolean next(List<Cell> list) throws IOException {
            return this.delegate.next(list);
        }

        public boolean next(List<Cell> list, int i) throws IOException {
            return this.delegate.next(list, i);
        }

        public void close() throws IOException {
            if (this.state != null) {
                LOG.info("During " + this.operation + ", Filter " + this.filterName + " completed using tx.snapshot of timestamp " + this.state.getTimestamp() + " which had the visibility bound of " + this.state.getVisibilityUpperBound());
            }
            this.delegate.close();
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase10cdh/MessageTableRegionObserver$MessageDataFilter.class */
    private static final class MessageDataFilter extends FilterBase {
        private final RegionCoprocessorEnvironment env;
        private final long timestamp;
        private final TransactionVisibilityState state;
        private final int prefixLength;
        private final LoadingCache<ByteBuffer, Map<String, String>> topicCache;
        private byte[] prevTopicIdBytes;
        private Long currentTTL;
        private Integer currentGen;

        MessageDataFilter(RegionCoprocessorEnvironment regionCoprocessorEnvironment, long j, int i, LoadingCache<ByteBuffer, Map<String, String>> loadingCache, @Nullable TransactionVisibilityState transactionVisibilityState) {
            this.env = regionCoprocessorEnvironment;
            this.timestamp = j;
            this.prefixLength = i;
            this.topicCache = loadingCache;
            this.state = transactionVisibilityState;
        }

        public Filter.ReturnCode filterKeyValue(Cell cell) throws IOException {
            int rowOffset = cell.getRowOffset() + this.prefixLength;
            int rowLength = cell.getRowLength() - this.prefixLength;
            long publishTimestamp = MessagingUtils.getPublishTimestamp(cell.getRowArray(), rowOffset, rowLength);
            int topicLengthMessageEntry = MessagingUtils.getTopicLengthMessageEntry(rowLength) - 4;
            int i = Bytes.toInt(cell.getRowArray(), rowOffset + topicLengthMessageEntry);
            try {
                if (this.prevTopicIdBytes == null || this.currentTTL == null || this.currentGen == null || !Bytes.equals(this.prevTopicIdBytes, 0, this.prevTopicIdBytes.length, cell.getRowArray(), rowOffset, topicLengthMessageEntry)) {
                    this.prevTopicIdBytes = Arrays.copyOfRange(cell.getRowArray(), rowOffset, rowOffset + topicLengthMessageEntry);
                    Map map = (Map) this.topicCache.get(ByteBuffer.wrap(this.prevTopicIdBytes));
                    this.currentTTL = Long.valueOf(Long.parseLong((String) map.get("ttl")));
                    this.currentGen = Integer.valueOf(Integer.parseInt((String) map.get("generation")));
                }
            } catch (ExecutionException e) {
                MessageTableRegionObserver.LOG.info("Region " + this.env.getRegion().getRegionNameAsString() + ", exception whiletrying to fetch properties of topicId " + MessagingUtils.toTopicId(this.prevTopicIdBytes) + "\n" + e.getMessage());
                MessageTableRegionObserver.LOG.debug("StackTrace: ", e);
            }
            if (MessagingUtils.isOlderGeneration(i, this.currentGen.intValue())) {
                return Filter.ReturnCode.SKIP;
            }
            if (i == this.currentGen.intValue() && this.timestamp - publishTimestamp > this.currentTTL.longValue()) {
                return Filter.ReturnCode.SKIP;
            }
            return Filter.ReturnCode.INCLUDE;
        }

        public Cell transformCell(Cell cell) throws IOException {
            byte[] bArr = MessagingUtils.Constants.TX_COL;
            if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), bArr, 0, bArr.length)) {
                long j = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
                if (j > 0 && this.state != null && this.state.getInvalid().contains(Long.valueOf(j))) {
                    Bytes.putLong(cell.getRowArray(), cell.getValueOffset(), (-1) * j);
                    return cell;
                }
            }
            return super.transformCell(cell);
        }
    }

    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            HTableDescriptor tableDesc = ((RegionCoprocessorEnvironment) coprocessorEnvironment).getRegion().getTableDesc();
            String value = tableDesc.getValue("cdap.messaging.metadata.hbase.namespace");
            String value2 = tableDesc.getValue("dataset.table.prefix");
            HTable10CDHNameConverter hTable10CDHNameConverter = new HTable10CDHNameConverter();
            this.cConfReader = new CConfigurationReader(coprocessorEnvironment.getConfiguration(), hTable10CDHNameConverter.getSysConfigTablePrefix(value2));
            CConfiguration read = this.cConfReader.read();
            String str = read.get("messaging.metadata.table.name");
            this.prefixLength = Integer.valueOf(tableDesc.getValue("cdap.messaging.table.prefix.num.bytes")).intValue();
            this.metadataTable = coprocessorEnvironment.getTable(hTable10CDHNameConverter.toTableName(value2, TableId.from(value, str)));
            this.topicCache = CacheBuilder.newBuilder().expireAfterWrite(read.getLong("messaging.coprocessor.metadata.cache.expiration.seconds"), TimeUnit.SECONDS).maximumSize(1000L).build(new CacheLoader<ByteBuffer, Map<String, String>>() { // from class: co.cask.cdap.data2.transaction.messaging.coprocessor.hbase10cdh.MessageTableRegionObserver.2
                public Map<String, String> load(ByteBuffer byteBuffer) throws Exception {
                    Map<String, String> map = (Map) MessageTableRegionObserver.GSON.fromJson(Bytes.toString(MessageTableRegionObserver.this.metadataTable.get(new Get(byteBuffer.array().length == byteBuffer.remaining() ? byteBuffer.array() : Bytes.toBytes(byteBuffer))).getValue(MessageTableRegionObserver.COL_FAMILY, MessageTableRegionObserver.COL)), MessageTableRegionObserver.MAP_TYPE);
                    map.put("ttl", Long.toString(TimeUnit.SECONDS.toMillis(Long.parseLong(map.get("ttl")))));
                    return map;
                }
            });
            this.txStateCache = (TransactionStateCache) getTransactionStateCacheSupplier(value2, coprocessorEnvironment.getConfiguration()).get();
        }
    }

    public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, KeyValueScanner keyValueScanner, InternalScanner internalScanner) throws IOException {
        LOG.info("preFlush, filter using MessageDataFilter");
        TransactionVisibilityState latestState = this.txStateCache.getLatestState();
        Scan scan = new Scan();
        scan.setFilter(new MessageDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicCache, latestState));
        return new LoggingInternalScanner("MessageDataFilter", "preFlush", new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(keyValueScanner), ScanType.COMPACT_DROP_DELETES, store.getSmallestReadPoint(), Long.MIN_VALUE), latestState);
    }

    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<? extends KeyValueScanner> list, ScanType scanType, long j, InternalScanner internalScanner, CompactionRequest compactionRequest) throws IOException {
        LOG.info("preCompact, filter using MessageDataFilter");
        TransactionVisibilityState latestState = this.txStateCache.getLatestState();
        Scan scan = new Scan();
        scan.setFilter(new MessageDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicCache, latestState));
        return new LoggingInternalScanner("MessageDataFilter", "preCompact", new StoreScanner(store, store.getScanInfo(), scan, list, scanType, store.getSmallestReadPoint(), j), latestState);
    }

    private Supplier<TransactionStateCache> getTransactionStateCacheSupplier(String str, Configuration configuration) {
        return new DefaultTransactionStateCacheSupplier(new HTable10CDHNameConverter().getSysConfigTablePrefix(str), configuration);
    }
}
