package co.cask.cdap.data2.transaction.messaging.coprocessor.hbase12cdh570;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.data2.transaction.coprocessor.DefaultTransactionStateCacheSupplier;
import co.cask.cdap.data2.util.hbase.CoprocessorCConfigurationReader;
import co.cask.cdap.data2.util.hbase.HBase12CDH570ScanBuilder;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.TopicMetadataCache;
import co.cask.cdap.messaging.TopicMetadataCacheSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
import org.apache.tephra.hbase.txprune.CompactionState;
import org.apache.tephra.persist.TransactionVisibilityState;

/* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/MessageTableRegionObserver.class */
public class MessageTableRegionObserver extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(MessageTableRegionObserver.class);
    private int prefixLength;
    private TransactionStateCacheSupplier txStateCacheSupplier;
    private TransactionStateCache txStateCache;
    private TopicMetadataCache topicMetadataCache;
    private TopicMetadataCacheSupplier topicMetadataCacheSupplier;
    private CompactionState compactionState;
    private Boolean pruneEnable;

    /* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/MessageTableRegionObserver$LoggingInternalScanner.class */
    private static final class LoggingInternalScanner implements InternalScanner {
        private static final Log LOG = LogFactory.getLog(LoggingInternalScanner.class);
        private final String filterName;
        private final String operation;
        private final InternalScanner delegate;
        private final TransactionVisibilityState state;

        LoggingInternalScanner(String str, String str2, InternalScanner internalScanner, @Nullable TransactionVisibilityState transactionVisibilityState) {
            this.filterName = str;
            this.operation = str2;
            this.delegate = internalScanner;
            this.state = transactionVisibilityState;
        }

        public boolean next(List<Cell> list) throws IOException {
            return this.delegate.next(list);
        }

        public boolean next(List<Cell> list, ScannerContext scannerContext) throws IOException {
            return this.delegate.next(list, scannerContext);
        }

        public void close() throws IOException {
            if (this.state != null) {
                LOG.info("During " + this.operation + ", Filter " + this.filterName + " completed using tx.snapshot of timestamp " + this.state.getTimestamp() + " which had the visibility bound of " + this.state.getVisibilityUpperBound());
            }
            this.delegate.close();
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/MessageTableRegionObserver$MessageDataFilter.class */
    private static final class MessageDataFilter extends FilterBase {
        private final RegionCoprocessorEnvironment env;
        private final long timestamp;
        private final TransactionVisibilityState state;
        private final int prefixLength;
        private final TopicMetadataCache metadataCache;
        private byte[] prevTopicIdBytes;
        private Long currentTTL;
        private Integer currentGen;

        MessageDataFilter(RegionCoprocessorEnvironment regionCoprocessorEnvironment, long j, int i, TopicMetadataCache topicMetadataCache, @Nullable TransactionVisibilityState transactionVisibilityState) {
            this.env = regionCoprocessorEnvironment;
            this.timestamp = j;
            this.prefixLength = i;
            this.metadataCache = topicMetadataCache;
            this.state = transactionVisibilityState;
        }

        public Filter.ReturnCode filterKeyValue(Cell cell) throws IOException {
            int rowOffset = cell.getRowOffset() + this.prefixLength;
            int rowLength = cell.getRowLength() - this.prefixLength;
            long publishTimestamp = MessagingUtils.getPublishTimestamp(cell.getRowArray(), rowOffset, rowLength);
            int topicLengthMessageEntry = MessagingUtils.getTopicLengthMessageEntry(rowLength) - 4;
            int i = Bytes.toInt(cell.getRowArray(), rowOffset + topicLengthMessageEntry);
            if (this.prevTopicIdBytes == null || this.currentTTL == null || this.currentGen == null || !Bytes.equals(this.prevTopicIdBytes, 0, this.prevTopicIdBytes.length, cell.getRowArray(), rowOffset, topicLengthMessageEntry)) {
                this.prevTopicIdBytes = Arrays.copyOfRange(cell.getRowArray(), rowOffset, rowOffset + topicLengthMessageEntry);
                Map<String, String> topicMetadata = this.metadataCache.getTopicMetadata(ByteBuffer.wrap(this.prevTopicIdBytes));
                if (topicMetadata == null) {
                    MessageTableRegionObserver.LOG.debug("Region " + this.env.getRegionInfo().getRegionNameAsString() + ", could not get properties of topicId " + MessagingUtils.toTopicId(this.prevTopicIdBytes));
                    return Filter.ReturnCode.INCLUDE;
                }
                this.currentTTL = Long.valueOf(Long.parseLong(topicMetadata.get(MessagingUtils.Constants.TTL_KEY)));
                this.currentGen = Integer.valueOf(Integer.parseInt(topicMetadata.get(MessagingUtils.Constants.GENERATION_KEY)));
            }
            return MessagingUtils.isOlderGeneration(i, this.currentGen.intValue()) ? Filter.ReturnCode.SKIP : (i != this.currentGen.intValue() || this.timestamp - publishTimestamp <= this.currentTTL.longValue()) ? Filter.ReturnCode.INCLUDE : Filter.ReturnCode.SKIP;
        }

        public Cell transformCell(Cell cell) throws IOException {
            byte[] bArr = MessagingUtils.Constants.TX_COL;
            if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), bArr, 0, bArr.length)) {
                long j = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
                if (j > 0 && this.state != null && this.state.getInvalid().contains(Long.valueOf(j))) {
                    Bytes.putLong(cell.getRowArray(), cell.getValueOffset(), (-1) * j);
                    return cell;
                }
            }
            return super.transformCell(cell);
        }
    }

    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            RegionCoprocessorEnvironment regionCoprocessorEnvironment = (RegionCoprocessorEnvironment) coprocessorEnvironment;
            HTableDescriptor tableDesc = regionCoprocessorEnvironment.getRegion().getTableDesc();
            String value = tableDesc.getValue(Constants.MessagingSystem.HBASE_METADATA_TABLE_NAMESPACE);
            this.prefixLength = Integer.valueOf(tableDesc.getValue(Constants.MessagingSystem.HBASE_MESSAGING_TABLE_PREFIX_NUM_BYTES)).intValue();
            String value2 = tableDesc.getValue(Constants.Dataset.TABLE_PREFIX);
            CoprocessorCConfigurationReader coprocessorCConfigurationReader = new CoprocessorCConfigurationReader(regionCoprocessorEnvironment, value2);
            this.txStateCacheSupplier = new DefaultTransactionStateCacheSupplier(value2, regionCoprocessorEnvironment);
            this.txStateCache = this.txStateCacheSupplier.m232get();
            this.topicMetadataCacheSupplier = new TopicMetadataCacheSupplier(regionCoprocessorEnvironment, coprocessorCConfigurationReader, value2, value, new HBase12CDH570ScanBuilder());
            this.topicMetadataCache = this.topicMetadataCacheSupplier.m97get();
        }
    }

    public void stop(CoprocessorEnvironment coprocessorEnvironment) {
        if (this.compactionState != null) {
            this.compactionState.stop();
        }
        this.topicMetadataCacheSupplier.release();
        this.txStateCacheSupplier.release();
    }

    public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, KeyValueScanner keyValueScanner, InternalScanner internalScanner) throws IOException {
        LOG.info("preFlush, filter using MessageDataFilter");
        TransactionVisibilityState latestState = this.txStateCache.getLatestState();
        Scan scan = new Scan();
        scan.setFilter(new MessageDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicMetadataCache, latestState));
        return new LoggingInternalScanner("MessageDataFilter", "preFlush", new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(keyValueScanner), ScanType.COMPACT_DROP_DELETES, store.getSmallestReadPoint(), Long.MIN_VALUE), latestState);
    }

    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
        Region region = observerContext.getEnvironment().getRegion();
        long numStoreFilesForRegion = numStoreFilesForRegion(observerContext);
        long memstoreSize = region.getMemstoreSize();
        LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), Long.valueOf(memstoreSize), Long.valueOf(numStoreFilesForRegion)));
        if (memstoreSize == 0 && numStoreFilesForRegion == 0 && this.compactionState != null) {
            this.compactionState.persistRegionEmpty(System.currentTimeMillis());
        }
    }

    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<? extends KeyValueScanner> list, ScanType scanType, long j, InternalScanner internalScanner, CompactionRequest compactionRequest) throws IOException {
        LOG.info("preCompact, filter using MessageDataFilter");
        TransactionVisibilityState latestState = this.txStateCache.getLatestState();
        reloadPruneState((RegionCoprocessorEnvironment) observerContext.getEnvironment());
        if (this.compactionState != null) {
            this.compactionState.record(compactionRequest, latestState);
        }
        Scan scan = new Scan();
        scan.setFilter(new MessageDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicMetadataCache, latestState));
        return new LoggingInternalScanner("MessageDataFilter", "preCompact", new StoreScanner(store, store.getScanInfo(), scan, list, scanType, store.getSmallestReadPoint(), j), latestState);
    }

    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, StoreFile storeFile, CompactionRequest compactionRequest) throws IOException {
        if (this.compactionState != null) {
            this.compactionState.persist();
        }
    }

    private void reloadPruneState(RegionCoprocessorEnvironment regionCoprocessorEnvironment) {
        boolean z;
        if (this.pruneEnable == null) {
            initializePruneState(regionCoprocessorEnvironment);
            return;
        }
        CConfiguration cConfiguration = this.topicMetadataCache.getCConfiguration();
        if (cConfiguration == null || (z = cConfiguration.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, false)) == this.pruneEnable.booleanValue()) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Transaction Invalid List pruning feature is set to %s now for region %s.", Boolean.valueOf(z), regionCoprocessorEnvironment.getRegionInfo().getRegionNameAsString()));
        }
        resetPruneState();
        initializePruneState(regionCoprocessorEnvironment);
    }

    private void initializePruneState(RegionCoprocessorEnvironment regionCoprocessorEnvironment) {
        CConfiguration cConfiguration = this.topicMetadataCache.getCConfiguration();
        if (cConfiguration != null) {
            this.pruneEnable = Boolean.valueOf(cConfiguration.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, false));
            if (Boolean.TRUE.equals(this.pruneEnable)) {
                String str = cConfiguration.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
                this.compactionState = new CompactionState(regionCoprocessorEnvironment, TableName.valueOf(str), TimeUnit.SECONDS.toMillis(cConfiguration.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)));
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state will be recorded in table %s", regionCoprocessorEnvironment.getRegionInfo().getTable().getNameWithNamespaceInclAsString(), str));
                }
            }
        }
    }

    private void resetPruneState() {
        this.pruneEnable = false;
        if (this.compactionState != null) {
            this.compactionState.stop();
            this.compactionState = null;
        }
    }

    private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
        long j = 0;
        while (observerContext.getEnvironment().getRegion().getStores().iterator().hasNext()) {
            j += ((Store) r0.next()).getStorefiles().size();
        }
        return j;
    }
}
