package co.cask.cdap.data2.transaction.queue.coprocessor.hbase10;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.transaction.coprocessor.DefaultTransactionStateCacheSupplier;
import co.cask.cdap.data2.transaction.queue.ConsumerEntryState;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueAdmin;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.ConsumerConfigCache;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.ConsumerConfigCacheSupplier;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.ConsumerInstance;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.QueueConsumerConfig;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.TableNameAwareCacheSupplier;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.CConfigurationReader;
import co.cask.cdap.data2.util.hbase.CoprocessorCConfigurationReader;
import co.cask.cdap.data2.util.hbase.HTableNameConverter;
import com.google.common.base.Supplier;
import com.google.common.io.InputSupplier;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.hbase.txprune.CompactionState;
import org.apache.tephra.persist.TransactionVisibilityState;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/coprocessor/hbase10/HBaseQueueRegionObserver.class */
public final class HBaseQueueRegionObserver extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(HBaseQueueRegionObserver.class);
    private TableName configTableName;
    private CConfigurationReader cConfReader;
    private Supplier<TransactionVisibilityState> txSnapshotSupplier;
    private CacheSupplier<TransactionStateCache> txStateCacheSupplier;
    private ConsumerConfigCacheSupplier configCacheSupplier;
    private CompactionState compactionState;
    private TransactionStateCache txStateCache;
    private ConsumerConfigCache configCache;
    private Boolean pruneEnable;
    private int prefixBytes;
    private String namespaceId;
    private String appName;
    private String flowName;

    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/coprocessor/hbase10/HBaseQueueRegionObserver$EvictionInternalScanner.class */
    private final class EvictionInternalScanner implements InternalScanner {
        private final String triggeringAction;
        private final RegionCoprocessorEnvironment env;
        private final InternalScanner scanner;
        private final TransactionVisibilityState state;
        private final ConsumerInstance consumerInstance;
        private byte[] currentQueue;
        private byte[] currentQueueRowPrefix;
        private QueueConsumerConfig consumerConfig;
        private long totalRows;
        private long rowsEvicted;
        private long skippedIncomplete;
        private boolean invalidTxData;

        private EvictionInternalScanner(String str, RegionCoprocessorEnvironment regionCoprocessorEnvironment, InternalScanner internalScanner, @Nullable TransactionVisibilityState transactionVisibilityState) {
            this.totalRows = 0L;
            this.rowsEvicted = 0L;
            this.skippedIncomplete = 0L;
            this.triggeringAction = str;
            this.env = regionCoprocessorEnvironment;
            this.scanner = internalScanner;
            this.state = transactionVisibilityState;
            this.consumerInstance = new ConsumerInstance(0L, 0);
        }

        public boolean next(List<Cell> list) throws IOException {
            return next(list, -1);
        }

        public boolean next(List<Cell> list, int i) throws IOException {
            boolean z;
            boolean next = this.scanner.next(list, i);
            while (true) {
                z = next;
                if (list.isEmpty()) {
                    break;
                }
                this.totalRows++;
                Cell cell = list.get(0);
                if (this.currentQueue == null || !QueueEntryRow.isQueueEntry(this.currentQueueRowPrefix, HBaseQueueRegionObserver.this.prefixBytes, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) {
                    this.currentQueue = null;
                }
                if (this.currentQueue == null) {
                    QueueName queueName = QueueEntryRow.getQueueName(HBaseQueueRegionObserver.this.namespaceId, HBaseQueueRegionObserver.this.appName, HBaseQueueRegionObserver.this.flowName, HBaseQueueRegionObserver.this.prefixBytes, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    this.currentQueue = queueName.toBytes();
                    this.currentQueueRowPrefix = QueueEntryRow.getQueueRowPrefix(queueName);
                    this.consumerConfig = HBaseQueueRegionObserver.this.configCache.getConsumerConfig(this.currentQueue);
                }
                this.invalidTxData = false;
                if (this.state != null) {
                    long writePointer = QueueEntryRow.getWritePointer(cell.getRowArray(), cell.getRowOffset() + HBaseQueueRegionObserver.this.prefixBytes + this.currentQueueRowPrefix.length);
                    if (writePointer > 0 && this.state.getInvalid().contains(Long.valueOf(writePointer))) {
                        this.invalidTxData = true;
                    }
                }
                if (this.consumerConfig == null && !this.invalidTxData) {
                    return z;
                }
                if (!this.invalidTxData && !canEvict(this.consumerConfig, list)) {
                    break;
                }
                this.rowsEvicted++;
                list.clear();
                next = this.scanner.next(list, i);
            }
            return z;
        }

        public void close() throws IOException {
            HBaseQueueRegionObserver.LOG.info("Region " + this.env.getRegionInfo().getRegionNameAsString() + " " + this.triggeringAction + ", rows evicted: " + this.rowsEvicted + " / " + this.totalRows + ", skipped incomplete: " + this.skippedIncomplete);
            this.scanner.close();
        }

        private boolean canEvict(QueueConsumerConfig queueConsumerConfig, List<Cell> list) {
            if (queueConsumerConfig.getNumGroups() == 0) {
                return true;
            }
            if (queueConsumerConfig.getNumGroups() < 0) {
                return false;
            }
            if (list.size() <= 2) {
                this.skippedIncomplete++;
                return false;
            }
            Iterator<Cell> it = list.iterator();
            Cell next = it.next();
            if (!QueueEntryRow.isDataColumn(next.getQualifierArray(), next.getQualifierOffset())) {
                this.skippedIncomplete++;
                return false;
            }
            Cell next2 = it.next();
            if (!QueueEntryRow.isMetaColumn(next2.getQualifierArray(), next2.getQualifierOffset())) {
                this.skippedIncomplete++;
                return false;
            }
            int i = 0;
            while (it.hasNext()) {
                Cell next3 = it.next();
                if (QueueEntryRow.isStateColumn(next3.getQualifierArray(), next3.getQualifierOffset())) {
                    if (!isProcessed(next3, this.consumerInstance)) {
                        break;
                    }
                    byte[] startRow = queueConsumerConfig.getStartRow(this.consumerInstance);
                    if (startRow != null && compareRowKey(next3, startRow) < 0) {
                        i++;
                    }
                }
            }
            return i == queueConsumerConfig.getNumGroups() || compareRowKey(list.get(0), queueConsumerConfig.getSmallestStartRow()) < 0;
        }

        private int compareRowKey(Cell cell, byte[] bArr) {
            return Bytes.compareTo(cell.getRowArray(), cell.getRowOffset() + HBaseQueueRegionObserver.this.prefixBytes, cell.getRowLength() - HBaseQueueRegionObserver.this.prefixBytes, bArr, 0, bArr.length);
        }

        private boolean isProcessed(Cell cell, ConsumerInstance consumerInstance) {
            boolean z = cell.getValueArray()[(cell.getValueOffset() + cell.getValueLength()) - 1] == ConsumerEntryState.PROCESSED.getState();
            if (z) {
                consumerInstance.setGroupInstance(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset() + 1), Bytes.toInt(cell.getValueArray(), cell.getValueOffset() + 8));
            }
            return z;
        }
    }

    public void start(CoprocessorEnvironment coprocessorEnvironment) {
        int parseInt;
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            HTableDescriptor tableDesc = ((RegionCoprocessorEnvironment) coprocessorEnvironment).getRegion().getTableDesc();
            String nameAsString = tableDesc.getNameAsString();
            String value = tableDesc.getValue("cdap.prefix.bytes");
            if (value == null) {
                parseInt = 1;
            } else {
                try {
                    parseInt = Integer.parseInt(value);
                } catch (NumberFormatException e) {
                    LOG.error("Unable to parse value of 'cdap.prefix.bytes' property. Default to 1", e);
                    this.prefixBytes = 1;
                }
            }
            this.prefixBytes = parseInt;
            this.namespaceId = HTableNameConverter.from(tableDesc).getNamespace();
            this.appName = HBaseQueueAdmin.getApplicationName(nameAsString);
            this.flowName = HBaseQueueAdmin.getFlowName(nameAsString);
            coprocessorEnvironment.getConfiguration();
            String value2 = tableDesc.getValue("dataset.table.prefix");
            this.txStateCacheSupplier = new DefaultTransactionStateCacheSupplier(value2, coprocessorEnvironment);
            this.txStateCache = this.txStateCacheSupplier.get();
            this.txSnapshotSupplier = new Supplier<TransactionVisibilityState>() { // from class: co.cask.cdap.data2.transaction.queue.coprocessor.hbase10.HBaseQueueRegionObserver.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public TransactionVisibilityState m10get() {
                    return HBaseQueueRegionObserver.this.txStateCache.getLatestState();
                }
            };
            this.configTableName = HTableNameConverter.toTableName(value2, TableId.from(this.namespaceId, HBaseQueueAdmin.getConfigTableName()));
            this.cConfReader = new CoprocessorCConfigurationReader(coprocessorEnvironment, value2);
            this.configCacheSupplier = createConfigCache(coprocessorEnvironment);
            this.configCache = this.configCacheSupplier.get();
        }
    }

    public void stop(CoprocessorEnvironment coprocessorEnvironment) {
        if (this.compactionState != null) {
            this.compactionState.stop();
        }
        this.configCacheSupplier.release();
        this.txStateCacheSupplier.release();
    }

    public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, InternalScanner internalScanner) throws IOException {
        if (!observerContext.getEnvironment().getRegion().isAvailable()) {
            return internalScanner;
        }
        LOG.info("preFlush, creates EvictionInternalScanner");
        return new EvictionInternalScanner("flush", observerContext.getEnvironment(), internalScanner, null);
    }

    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, InternalScanner internalScanner, ScanType scanType, CompactionRequest compactionRequest) throws IOException {
        if (!observerContext.getEnvironment().getRegion().isAvailable()) {
            return internalScanner;
        }
        LOG.info("preCompact, creates EvictionInternalScanner");
        TransactionVisibilityState latestState = this.txStateCache.getLatestState();
        reloadPruneState((RegionCoprocessorEnvironment) observerContext.getEnvironment());
        if (this.compactionState != null) {
            this.compactionState.record(compactionRequest, latestState);
        }
        return new EvictionInternalScanner("compaction", observerContext.getEnvironment(), internalScanner, latestState);
    }

    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, StoreFile storeFile, CompactionRequest compactionRequest) throws IOException {
        if (this.compactionState != null) {
            this.compactionState.persist();
        }
    }

    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
        HRegion region = observerContext.getEnvironment().getRegion();
        long numStoreFilesForRegion = numStoreFilesForRegion(observerContext);
        long j = region.getMemstoreSize().get();
        LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), Long.valueOf(j), Long.valueOf(numStoreFilesForRegion)));
        if (j == 0 && numStoreFilesForRegion == 0 && this.compactionState != null) {
            this.compactionState.persistRegionEmpty(System.currentTimeMillis());
        }
    }

    private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
        long j = 0;
        while (observerContext.getEnvironment().getRegion().getStores().values().iterator().hasNext()) {
            j += ((Store) r0.next()).getStorefiles().size();
        }
        return j;
    }

    private void reloadPruneState(RegionCoprocessorEnvironment regionCoprocessorEnvironment) {
        boolean z;
        if (this.pruneEnable == null) {
            initializePruneState(regionCoprocessorEnvironment);
            return;
        }
        CConfiguration cConf = this.configCache.getCConf();
        if (cConf == null || (z = cConf.getBoolean("data.tx.prune.enable", false)) == this.pruneEnable.booleanValue()) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Transaction Invalid List pruning feature is set to %s now for region %s.", Boolean.valueOf(z), regionCoprocessorEnvironment.getRegionInfo().getRegionNameAsString()));
        }
        resetPruneState();
        initializePruneState(regionCoprocessorEnvironment);
    }

    private void initializePruneState(RegionCoprocessorEnvironment regionCoprocessorEnvironment) {
        CConfiguration cConf = this.configCache.getCConf();
        if (cConf != null) {
            this.pruneEnable = Boolean.valueOf(cConf.getBoolean("data.tx.prune.enable", false));
            if (Boolean.TRUE.equals(this.pruneEnable)) {
                String str = cConf.get("data.tx.prune.state.table", "tephra.state");
                this.compactionState = new CompactionState(regionCoprocessorEnvironment, TableName.valueOf(str), TimeUnit.SECONDS.toMillis(cConf.getLong("data.tx.prune.flush.interval", TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)));
                if (LOG.isDebugEnabled()) {
                    TableName table = regionCoprocessorEnvironment.getRegionInfo().getTable();
                    LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will be recorded in table %s", table.getNamespaceAsString(), table.getNameAsString(), str));
                }
            }
        }
    }

    private void resetPruneState() {
        this.pruneEnable = false;
        if (this.compactionState != null) {
            this.compactionState.stop();
            this.compactionState = null;
        }
    }

    private void updateCache() throws IOException {
        if (this.configCache != null) {
            this.configCache.updateCache();
        }
    }

    private ConsumerConfigCacheSupplier createConfigCache(final CoprocessorEnvironment coprocessorEnvironment) {
        return TableNameAwareCacheSupplier.getSupplier(this.configTableName, this.cConfReader, this.txSnapshotSupplier, new InputSupplier<HTableInterface>() { // from class: co.cask.cdap.data2.transaction.queue.coprocessor.hbase10.HBaseQueueRegionObserver.2
            /* renamed from: getInput, reason: merged with bridge method [inline-methods] */
            public HTableInterface m11getInput() throws IOException {
                return coprocessorEnvironment.getTable(HBaseQueueRegionObserver.this.configTableName);
            }
        });
    }

    private TransactionStateCache getTxStateCache() {
        return this.txStateCache;
    }
}
