package co.cask.cdap.data2.transaction.messaging.coprocessor.hbase12cdh570;

import co.cask.cdap.data2.util.hbase.CoprocessorCConfigurationReader;
import co.cask.cdap.data2.util.hbase.HBase12CDH570ScanBuilder;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.TopicMetadataCache;
import co.cask.cdap.messaging.TopicMetadataCacheSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;

/* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/PayloadTableRegionObserver.class */
public class PayloadTableRegionObserver extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(PayloadTableRegionObserver.class);
    private int prefixLength;
    private TopicMetadataCacheSupplier topicMetadataCacheSupplier;
    private TopicMetadataCache topicMetadataCache;

    /* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/PayloadTableRegionObserver$PayloadDataFilter.class */
    private static final class PayloadDataFilter extends FilterBase {
        private final RegionCoprocessorEnvironment env;
        private final long timestamp;
        private final int prefixLength;
        private final TopicMetadataCache metadataCache;
        private byte[] prevTopicIdBytes;
        private Long currentTTL;
        private Integer currentGen;

        PayloadDataFilter(RegionCoprocessorEnvironment regionCoprocessorEnvironment, long j, int i, TopicMetadataCache topicMetadataCache) {
            this.env = regionCoprocessorEnvironment;
            this.timestamp = j;
            this.prefixLength = i;
            this.metadataCache = topicMetadataCache;
        }

        public Filter.ReturnCode filterKeyValue(Cell cell) throws IOException {
            int rowOffset = cell.getRowOffset() + this.prefixLength;
            int rowLength = cell.getRowLength() - this.prefixLength;
            long writeTimestamp = MessagingUtils.getWriteTimestamp(cell.getRowArray(), rowOffset, rowLength);
            int topicLengthPayloadEntry = MessagingUtils.getTopicLengthPayloadEntry(rowLength) - 4;
            int i = Bytes.toInt(cell.getRowArray(), rowOffset + topicLengthPayloadEntry);
            if (this.prevTopicIdBytes == null || this.currentTTL == null || this.currentGen == null || !Bytes.equals(this.prevTopicIdBytes, 0, this.prevTopicIdBytes.length, cell.getRowArray(), rowOffset, topicLengthPayloadEntry)) {
                this.prevTopicIdBytes = Arrays.copyOfRange(cell.getRowArray(), rowOffset, rowOffset + topicLengthPayloadEntry);
                Map topicMetadata = this.metadataCache.getTopicMetadata(ByteBuffer.wrap(this.prevTopicIdBytes));
                if (topicMetadata == null) {
                    PayloadTableRegionObserver.LOG.debug("Region " + this.env.getRegionInfo().getRegionNameAsString() + ", could not get properties of topicId " + MessagingUtils.toTopicId(this.prevTopicIdBytes));
                    return Filter.ReturnCode.INCLUDE;
                }
                this.currentTTL = Long.valueOf(Long.parseLong((String) topicMetadata.get("ttl")));
                this.currentGen = Integer.valueOf(Integer.parseInt((String) topicMetadata.get("generation")));
            }
            return MessagingUtils.isOlderGeneration(i, this.currentGen.intValue()) ? Filter.ReturnCode.SKIP : (i != this.currentGen.intValue() || this.timestamp - writeTimestamp <= this.currentTTL.longValue()) ? Filter.ReturnCode.INCLUDE : Filter.ReturnCode.SKIP;
        }
    }

    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            RegionCoprocessorEnvironment regionCoprocessorEnvironment = (RegionCoprocessorEnvironment) coprocessorEnvironment;
            HTableDescriptor tableDesc = regionCoprocessorEnvironment.getRegion().getTableDesc();
            String value = tableDesc.getValue("cdap.messaging.metadata.hbase.namespace");
            String value2 = tableDesc.getValue("dataset.table.prefix");
            this.prefixLength = Integer.valueOf(tableDesc.getValue("cdap.messaging.table.prefix.num.bytes")).intValue();
            this.topicMetadataCacheSupplier = new TopicMetadataCacheSupplier(regionCoprocessorEnvironment, new CoprocessorCConfigurationReader(regionCoprocessorEnvironment, value2), value2, value, new HBase12CDH570ScanBuilder());
            this.topicMetadataCache = this.topicMetadataCacheSupplier.get();
        }
    }

    public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        this.topicMetadataCacheSupplier.release();
    }

    public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, KeyValueScanner keyValueScanner, InternalScanner internalScanner) throws IOException {
        LOG.info("preFlush, filter using PayloadDataFilter");
        Scan scan = new Scan();
        scan.setFilter(new PayloadDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicMetadataCache));
        return new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(keyValueScanner), ScanType.COMPACT_DROP_DELETES, store.getSmallestReadPoint(), Long.MIN_VALUE);
    }

    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<? extends KeyValueScanner> list, ScanType scanType, long j, InternalScanner internalScanner, CompactionRequest compactionRequest) throws IOException {
        LOG.info("preCompact, filter using PayloadDataFilter");
        Scan scan = new Scan();
        scan.setFilter(new PayloadDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicMetadataCache));
        return new StoreScanner(store, store.getScanInfo(), scan, list, scanType, store.getSmallestReadPoint(), j);
    }
}
