package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.lib.AbstractDataset;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.collect.AllCollector;
import co.cask.cdap.common.collect.AllPairCollector;
import co.cask.cdap.common.collect.Collector;
import co.cask.cdap.common.collect.PairCollector;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.transaction.queue.QueueConfigurer;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.tephra.Transaction;
import org.apache.tephra.TxConstants;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseConsumerStateStore.class */
public final class HBaseConsumerStateStore extends AbstractDataset implements QueueConfigurer {
    private static final Gson GSON = new Gson();
    private final QueueName queueName;
    private final Table table;
    private final byte[] barrierScanStartRow;
    private final byte[] barrierScanEndRow;
    private Transaction transaction;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseConsumerStateStore$BarrierInfoCollector.class */
    public static final class BarrierInfoCollector implements Collector<QueueBarrier> {
        private final byte[] startRow;
        private final Deque<QueueBarrier> infos;

        private BarrierInfoCollector(byte[] bArr) {
            this.infos = Lists.newLinkedList();
            this.startRow = bArr;
        }

        @Override // co.cask.cdap.common.collect.Collector
        public boolean addElement(QueueBarrier queueBarrier) {
            if (Bytes.compareTo(this.startRow, queueBarrier.getStartRow()) < 0) {
                this.infos.add(queueBarrier);
                return false;
            }
            this.infos.poll();
            this.infos.add(queueBarrier);
            return true;
        }

        @Override // co.cask.cdap.common.collect.Collector
        public <T extends Collection<? super QueueBarrier>> T finish(T t) {
            t.addAll(this.infos);
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseConsumerStateStore$ConsumerState.class */
    public static final class ConsumerState {
        private final byte[] consumerStartRow;
        private final QueueBarrier previousBarrier;
        private final QueueBarrier nextBarrier;

        private ConsumerState(byte[] bArr, @Nullable QueueBarrier queueBarrier, @Nullable QueueBarrier queueBarrier2) {
            this.consumerStartRow = bArr;
            this.previousBarrier = queueBarrier;
            this.nextBarrier = queueBarrier2;
        }

        public byte[] getConsumerStartRow() {
            return this.consumerStartRow;
        }

        @Nullable
        public QueueBarrier getNextBarrier() {
            return this.nextBarrier;
        }

        @Nullable
        public QueueBarrier getPreviousBarrier() {
            return this.previousBarrier;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HBaseConsumerStateStore(String str, QueueName queueName, Table table) {
        super(str, table, new Dataset[0]);
        this.queueName = queueName;
        this.table = table;
        this.barrierScanStartRow = Bytes.add(queueName.toBytes(), QueueEntryRow.getQueueEntryRowKey(queueName, 0L, 0));
        this.barrierScanEndRow = Bytes.stopKeyForPrefix(Bytes.add(queueName.toBytes(), QueueEntryRow.getQueueEntryRowKey(queueName, Transaction.NO_TX_IN_PROGRESS, 0)));
    }

    @Override // co.cask.cdap.api.dataset.lib.AbstractDataset, org.apache.tephra.TransactionAware
    public void startTx(Transaction transaction) {
        super.startTx(transaction);
        this.transaction = transaction;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Table getInternalTable() {
        return this.table;
    }

    HBaseConsumerState getState(long j, int i) {
        ConsumerState consumerState = getConsumerState(j, i);
        QueueBarrier previousBarrier = consumerState.getPreviousBarrier();
        QueueBarrier nextBarrier = consumerState.getNextBarrier();
        if (previousBarrier == null && nextBarrier == null) {
            throw new IllegalStateException(String.format("Unable to find barrier information for consumer. Queue: %s, GroupId: %d, InstanceId:%d", this.queueName, Long.valueOf(j), Integer.valueOf(i)));
        }
        return new HBaseConsumerState(new ConsumerConfig(previousBarrier != null ? previousBarrier.getGroupConfig() : nextBarrier.getGroupConfig(), i), consumerState.getConsumerStartRow(), previousBarrier == null ? null : previousBarrier.getStartRow(), nextBarrier == null ? null : nextBarrier.getStartRow());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAllConsumed(long j, byte[] bArr) {
        Iterator<byte[]> it2 = fetchStartRows(j, TxConstants.Manager.DEFAULT_TX_MAX_TIMEOUT).values().iterator();
        while (it2.hasNext()) {
            if (Bytes.compareTo(it2.next(), bArr) < 0) {
                return false;
            }
        }
        return true;
    }

    boolean isAllConsumed(ConsumerConfig consumerConfig, byte[] bArr) {
        for (Map.Entry<Integer, byte[]> entry : fetchStartRows(consumerConfig.getGroupId(), TxConstants.Manager.DEFAULT_TX_MAX_TIMEOUT).entrySet()) {
            if (entry.getKey().intValue() % consumerConfig.getGroupSize() == consumerConfig.getInstanceId() && Bytes.compareTo(entry.getValue(), bArr) < 0) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateState(long j, int i, byte[] bArr) {
        this.table.put(this.queueName.toBytes(), getConsumerStateColumn(j, i), bArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completed(long j, int i) {
        QueueBarrier nextBarrier = getConsumerState(j, i).getNextBarrier();
        if (nextBarrier == null) {
            throw new IllegalArgumentException(String.format("No end barrier information for consumer. Queue: %s, GroupId: %d, InstanceId: %d", this.queueName, Long.valueOf(j), Integer.valueOf(i)));
        }
        byte[] consumerStateColumn = getConsumerStateColumn(j, i);
        if (i < nextBarrier.getGroupConfig().getGroupSize()) {
            this.table.put(this.queueName.toBytes(), consumerStateColumn, nextBarrier.getStartRow());
            return;
        }
        Scanner scan = this.table.scan(Bytes.add(this.queueName.toBytes(), nextBarrier.getStartRow()), this.barrierScanEndRow);
        Throwable th = null;
        try {
            boolean z = false;
            while (!z) {
                Row next = scan.next();
                if (next == null) {
                    break;
                }
                QueueBarrier decodeBarrierInfo = decodeBarrierInfo(next, j);
                if (decodeBarrierInfo != null && i < decodeBarrierInfo.getGroupConfig().getGroupSize()) {
                    this.table.put(this.queueName.toBytes(), consumerStateColumn, decodeBarrierInfo.getStartRow());
                    z = true;
                }
            }
            if (!z) {
                this.table.delete(this.queueName.toBytes(), consumerStateColumn);
            }
            if (scan != null) {
                if (0 == 0) {
                    scan.close();
                    return;
                }
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (scan != null) {
                if (0 != 0) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        Scanner scan = this.table.scan(this.barrierScanStartRow, this.barrierScanEndRow);
        Throwable th = null;
        try {
            for (Row next = scan.next(); next != null; next = scan.next()) {
                this.table.delete(next.getRow());
            }
            this.table.delete(this.queueName.toBytes());
            if (scan != null) {
                if (0 == 0) {
                    scan.close();
                    return;
                }
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (scan != null) {
                if (0 != 0) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
            throw th3;
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueConfigurer
    public void configureInstances(long j, int i) {
        List<QueueBarrier> list = (List) scanBarriers(j, new AllCollector()).finish(new ArrayList());
        Preconditions.checkState(!list.isEmpty(), "No queue configuration found for group %s", new Object[]{Long.valueOf(j)});
        ConsumerGroupConfig groupConfig = ((QueueBarrier) list.get(list.size() - 1)).getGroupConfig();
        ConsumerGroupConfig consumerGroupConfig = new ConsumerGroupConfig(j, i, groupConfig.getDequeueStrategy(), groupConfig.getHashKey());
        byte[] queueEntryRowKey = QueueEntryRow.getQueueEntryRowKey(this.queueName, this.transaction.getWritePointer(), 0);
        Put put = new Put(Bytes.add(this.queueName.toBytes(), queueEntryRowKey));
        put.add(Bytes.toBytes(consumerGroupConfig.getGroupId()), GSON.toJson(consumerGroupConfig));
        this.table.put(put);
        Map<Integer, byte[]> fetchStartRows = fetchStartRows(j, TxConstants.Manager.DEFAULT_TX_MAX_TIMEOUT);
        for (int i2 = 0; i2 < i; i2++) {
            if (!fetchStartRows.containsKey(Integer.valueOf(i2))) {
                this.table.put(this.queueName.toBytes(), getConsumerStateColumn(j, i2), queueEntryRowKey);
            }
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        for (QueueBarrier queueBarrier : list) {
            boolean z = true;
            Iterator<byte[]> it2 = fetchStartRows.values().iterator();
            while (true) {
                if (it2.hasNext()) {
                    if (Bytes.compareTo(it2.next(), queueBarrier.getStartRow()) <= 0) {
                        z = false;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (!z) {
                break;
            } else {
                newLinkedList.add(Bytes.add(this.queueName.toBytes(), queueBarrier.getStartRow()));
            }
        }
        if (newLinkedList.size() > 1) {
            newLinkedList.removeLast();
            byte[] bytes = Bytes.toBytes(j);
            Iterator it3 = newLinkedList.iterator();
            while (it3.hasNext()) {
                this.table.delete((byte[]) it3.next(), bytes);
            }
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueConfigurer
    public void configureGroups(Iterable<? extends ConsumerGroupConfig> iterable) {
        com.google.common.collect.Table<Long, Integer, byte[]> fetchAllStartRows = fetchAllStartRows();
        byte[] queueEntryRowKey = QueueEntryRow.getQueueEntryRowKey(this.queueName, this.transaction.getWritePointer(), 0);
        Put put = new Put(Bytes.add(this.queueName.toBytes(), queueEntryRowKey));
        HashSet newHashSet = Sets.newHashSet();
        for (ConsumerGroupConfig consumerGroupConfig : iterable) {
            long groupId = consumerGroupConfig.getGroupId();
            if (!newHashSet.add(Long.valueOf(groupId))) {
                throw new IllegalArgumentException("Same consumer group is provided multiple times");
            }
            put.add(Bytes.toBytes(groupId), GSON.toJson(consumerGroupConfig));
            for (int i = 0; i < consumerGroupConfig.getGroupSize(); i++) {
                if (!fetchAllStartRows.contains(Long.valueOf(groupId), Integer.valueOf(i))) {
                    this.table.put(this.queueName.toBytes(), getConsumerStateColumn(groupId, i), queueEntryRowKey);
                }
            }
        }
        deleteRemovedGroups(this.table.get(this.queueName.toBytes()), newHashSet);
        LinkedHashMultimap create = LinkedHashMultimap.create();
        Scanner scan = this.table.scan(this.barrierScanStartRow, this.barrierScanEndRow);
        Throwable th = null;
        try {
            try {
                for (Row next = scan.next(); next != null; next = scan.next()) {
                    deleteRemovedGroups(next, newHashSet);
                    Iterator<Map.Entry<byte[], byte[]>> it2 = next.getColumns().entrySet().iterator();
                    while (it2.hasNext()) {
                        QueueBarrier decodeBarrierInfo = decodeBarrierInfo(next.getRow(), it2.next().getValue());
                        if (decodeBarrierInfo != null) {
                            long groupId2 = decodeBarrierInfo.getGroupConfig().getGroupId();
                            boolean z = true;
                            for (int i2 = 0; i2 < decodeBarrierInfo.getGroupConfig().getGroupSize(); i2++) {
                                byte[] bArr = (byte[]) fetchAllStartRows.get(Long.valueOf(groupId2), Integer.valueOf(i2));
                                if (bArr == null || Bytes.compareTo(bArr, decodeBarrierInfo.getStartRow()) < 0) {
                                    z = false;
                                    break;
                                }
                            }
                            if (z) {
                                create.put(Long.valueOf(groupId2), next.getRow());
                            }
                        }
                    }
                }
                if (scan != null) {
                    if (0 != 0) {
                        try {
                            scan.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        scan.close();
                    }
                }
                for (Map.Entry entry : create.asMap().entrySet()) {
                    if (((Collection) entry.getValue()).size() > 1) {
                        LinkedList newLinkedList = Lists.newLinkedList((Iterable) entry.getValue());
                        newLinkedList.removeLast();
                        byte[] bytes = Bytes.toBytes(((Long) entry.getKey()).longValue());
                        Iterator it3 = newLinkedList.iterator();
                        while (it3.hasNext()) {
                            this.table.delete((byte[]) it3.next(), bytes);
                        }
                    }
                }
                this.table.put(put);
            } finally {
            }
        } catch (Throwable th3) {
            if (scan != null) {
                if (th != null) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
            throw th3;
        }
    }

    private void deleteRemovedGroups(Row row, Set<Long> set) {
        for (byte[] bArr : row.getColumns().keySet()) {
            if (!set.contains(Long.valueOf(getGroupIdFromColumn(bArr)))) {
                this.table.delete(row.getRow(), bArr);
            }
        }
    }

    public Transaction getTransaction() {
        return this.transaction;
    }

    public List<QueueBarrier> getAllBarriers(long j) {
        return (List) scanBarriers(j, new AllCollector()).finish(new ArrayList());
    }

    public Multimap<Long, QueueBarrier> getAllBarriers() {
        return scanBarriers(new AllPairCollector()).finishMultimap(LinkedHashMultimap.create());
    }

    void getLatestConsumerGroups(Collection<? super ConsumerGroupConfig> collection) {
        Scanner scan = this.table.scan(this.barrierScanStartRow, this.barrierScanEndRow);
        Throwable th = null;
        try {
            Row row = null;
            for (Row next = scan.next(); next != null; next = scan.next()) {
                row = next;
            }
            if (row == null) {
                throw new IllegalStateException("No consumer group information. Queue: " + this.queueName);
            }
            Iterator<Map.Entry<byte[], byte[]>> it2 = row.getColumns().entrySet().iterator();
            while (it2.hasNext()) {
                collection.add(GSON.fromJson(new String(it2.next().getValue(), Charsets.UTF_8), ConsumerGroupConfig.class));
            }
            if (scan != null) {
                if (0 == 0) {
                    scan.close();
                    return;
                }
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (scan != null) {
                if (0 != 0) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
            throw th3;
        }
    }

    private PairCollector<Long, QueueBarrier> scanBarriers(PairCollector<Long, QueueBarrier> pairCollector) {
        Scanner scan = this.table.scan(this.barrierScanStartRow, this.barrierScanEndRow);
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null) {
                        if (scan != null) {
                            if (0 != 0) {
                                try {
                                    scan.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                scan.close();
                            }
                        }
                        return pairCollector;
                    }
                    Map<Long, QueueBarrier> decodeBarrierInfo = decodeBarrierInfo(next);
                    if (decodeBarrierInfo != null) {
                        Iterator<Map.Entry<Long, QueueBarrier>> it2 = decodeBarrierInfo.entrySet().iterator();
                        while (it2.hasNext()) {
                            if (!pairCollector.addElement(it2.next())) {
                                if (scan != null) {
                                    if (0 != 0) {
                                        try {
                                            scan.close();
                                        } catch (Throwable th3) {
                                            th.addSuppressed(th3);
                                        }
                                    } else {
                                        scan.close();
                                    }
                                }
                                return pairCollector;
                            }
                        }
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th4;
            }
        }
    }

    private Collector<QueueBarrier> scanBarriers(long j, Collector<QueueBarrier> collector) {
        QueueBarrier decodeBarrierInfo;
        Scanner scan = this.table.scan(this.barrierScanStartRow, this.barrierScanEndRow);
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null || ((decodeBarrierInfo = decodeBarrierInfo(next, j)) != null && !collector.addElement(decodeBarrierInfo))) {
                        break;
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return collector;
    }

    private Map<Integer, byte[]> fetchStartRows(long j, int i) {
        return getStartRowsFromColumns(this.table.get(this.queueName.toBytes(), getConsumerStateColumn(j, 0), getConsumerStateColumn(j, i == Integer.MAX_VALUE ? TxConstants.Manager.DEFAULT_TX_MAX_TIMEOUT : i + 1), i).getColumns());
    }

    private com.google.common.collect.Table<Long, Integer, byte[]> fetchAllStartRows() {
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry<byte[], byte[]> entry : this.table.get(this.queueName.toBytes()).getColumns().entrySet()) {
            create.put(Long.valueOf(getGroupIdFromColumn(entry.getKey())), Integer.valueOf(getInstanceIdFromColumn(entry.getKey())), entry.getValue());
        }
        return create;
    }

    private Map<Integer, byte[]> getStartRowsFromColumns(Map<byte[], byte[]> map) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
            builder.put(Integer.valueOf(getInstanceIdFromColumn(entry.getKey())), entry.getValue());
        }
        return builder.build();
    }

    private ConsumerState getConsumerState(long j, int i) {
        byte[] bArr = this.table.get(this.queueName.toBytes(), getConsumerStateColumn(j, i));
        if (bArr == null) {
            throw new IllegalStateException(String.format("Unable to find consumer state. Queue: %s, GroupId: %d, InstanceId: %d", this.queueName, Long.valueOf(j), Integer.valueOf(i)));
        }
        Queue queue = (Queue) scanBarriers(j, new BarrierInfoCollector(bArr)).finish(new LinkedList());
        if (queue.isEmpty()) {
            throw new IllegalStateException(String.format("Unable to barrier info for consumer. Queue: %s, GroupId: %d, InstanceId: %d, StartRow: %s", this.queueName, Long.valueOf(j), Integer.valueOf(i), Bytes.toStringBinary(bArr)));
        }
        QueueBarrier queueBarrier = (QueueBarrier) queue.poll();
        return Bytes.compareTo(queueBarrier.getStartRow(), bArr) > 0 ? new ConsumerState(bArr, null, queueBarrier) : new ConsumerState(bArr, queueBarrier, (QueueBarrier) queue.poll());
    }

    private long getGroupIdFromColumn(byte[] bArr) {
        return Bytes.toLong(bArr);
    }

    private int getInstanceIdFromColumn(byte[] bArr) {
        return Bytes.toInt(bArr, 8);
    }

    private byte[] getConsumerStateColumn(long j, int i) {
        byte[] bArr = new byte[12];
        Bytes.putLong(bArr, 0, j);
        Bytes.putInt(bArr, 8, i);
        return bArr;
    }

    private Map<Long, QueueBarrier> decodeBarrierInfo(Row row) {
        HashMap newHashMap = Maps.newHashMap();
        Iterator<byte[]> it2 = row.getColumns().keySet().iterator();
        while (it2.hasNext()) {
            QueueBarrier decodeBarrierInfo = decodeBarrierInfo(row.getRow(), row.get(it2.next()));
            if (decodeBarrierInfo != null) {
                newHashMap.put(Long.valueOf(decodeBarrierInfo.getGroupConfig().getGroupId()), decodeBarrierInfo);
            }
        }
        return newHashMap;
    }

    @Nullable
    private QueueBarrier decodeBarrierInfo(Row row, long j) {
        return decodeBarrierInfo(row.getRow(), row.get(Bytes.toBytes(j)));
    }

    @Nullable
    private QueueBarrier decodeBarrierInfo(byte[] bArr, @Nullable byte[] bArr2) {
        if (bArr2 == null) {
            return null;
        }
        return new QueueBarrier((ConsumerGroupConfig) GSON.fromJson(new String(bArr2, Charsets.UTF_8), ConsumerGroupConfig.class), Arrays.copyOfRange(bArr, this.queueName.toBytes().length, bArr.length));
    }
}
