package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.queue.DequeueResult;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.data2.queue.QueueProducer;
import co.cask.cdap.data2.transaction.ForwardingTransactionAware;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueClientFactory.class */
public class HBaseQueueClientFactory implements QueueClientFactory {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseQueueClientFactory.class);
    private static final int DEFAULT_WRITE_BUFFER_SIZE = 4194304;
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final HBaseQueueAdmin queueAdmin;
    private final HBaseQueueUtil queueUtil = new HBaseQueueUtilFactory().get();
    private final HBaseTableUtil hBaseTableUtil;
    private final TransactionExecutorFactory txExecutorFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueClientFactory$2, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueClientFactory$2.class */
    public class AnonymousClass2 implements Callable<List<HBaseQueueConsumer>> {
        final /* synthetic */ HBaseQueueAdmin val$admin;
        final /* synthetic */ QueueName val$queueName;
        final /* synthetic */ long val$groupId;
        final /* synthetic */ ConsumerConfig val$consumerConfig;

        AnonymousClass2(HBaseQueueAdmin hBaseQueueAdmin, QueueName queueName, long j, ConsumerConfig consumerConfig) {
            this.val$admin = hBaseQueueAdmin;
            this.val$queueName = queueName;
            this.val$groupId = j;
            this.val$consumerConfig = consumerConfig;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<HBaseQueueConsumer> call() throws Exception {
            final HBaseConsumerStateStore consumerStateStore = this.val$admin.getConsumerStateStore(this.val$queueName);
            Throwable th = null;
            try {
                try {
                    List<HBaseConsumerState> list = (List) Transactions.createTransactionExecutor(HBaseQueueClientFactory.this.txExecutorFactory, consumerStateStore).execute(new Callable<List<HBaseConsumerState>>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueClientFactory.2.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public List<HBaseConsumerState> call() throws Exception {
                            ArrayList newArrayList = Lists.newArrayList();
                            HBaseConsumerState state = consumerStateStore.getState(AnonymousClass2.this.val$groupId, AnonymousClass2.this.val$consumerConfig.getInstanceId());
                            if (state.getPreviousBarrier() == null) {
                                newArrayList.add(state);
                                return newArrayList;
                            }
                            List<QueueBarrier> allBarriers = consumerStateStore.getAllBarriers(AnonymousClass2.this.val$groupId);
                            if (allBarriers.isEmpty()) {
                                throw new IllegalStateException(String.format("No consumer information available. Queue: %s, GroupId: %d, InstanceId: %d", AnonymousClass2.this.val$queueName, Long.valueOf(AnonymousClass2.this.val$groupId), Integer.valueOf(AnonymousClass2.this.val$consumerConfig.getInstanceId())));
                            }
                            int groupSize = ((QueueBarrier) Iterables.find(Lists.reverse(allBarriers), new Predicate<QueueBarrier>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueClientFactory.2.1.1
                                public boolean apply(QueueBarrier queueBarrier) {
                                    return queueBarrier.getGroupConfig().getGroupSize() > AnonymousClass2.this.val$consumerConfig.getInstanceId() && consumerStateStore.isAllConsumed(AnonymousClass2.this.val$consumerConfig, queueBarrier.getStartRow());
                                }
                            }, allBarriers.get(0))).getGroupConfig().getGroupSize();
                            int instanceId = AnonymousClass2.this.val$consumerConfig.getInstanceId();
                            while (true) {
                                int i = instanceId;
                                if (i >= groupSize) {
                                    return newArrayList;
                                }
                                newArrayList.add(consumerStateStore.getState(AnonymousClass2.this.val$groupId, i));
                                instanceId = i + AnonymousClass2.this.val$consumerConfig.getGroupSize();
                            }
                        }
                    });
                    if (consumerStateStore != null) {
                        if (0 != 0) {
                            try {
                                consumerStateStore.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            consumerStateStore.close();
                        }
                    }
                    ArrayList newArrayList = Lists.newArrayList();
                    for (HBaseConsumerState hBaseConsumerState : list) {
                        HTable createHTable = HBaseQueueClientFactory.this.createHTable(this.val$admin.getDataTableId(this.val$queueName, hBaseConsumerState.getPreviousBarrier() == null ? QueueConstants.QueueType.QUEUE : QueueConstants.QueueType.SHARDED_QUEUE));
                        int distributorBuckets = HBaseQueueClientFactory.this.getDistributorBuckets(createHTable.getTableDescriptor());
                        newArrayList.add(HBaseQueueClientFactory.this.queueUtil.getQueueConsumer(HBaseQueueClientFactory.this.cConf, createHTable, this.val$queueName, hBaseConsumerState, this.val$admin.getConsumerStateStore(this.val$queueName), hBaseConsumerState.getPreviousBarrier() == null ? new SaltedHBaseQueueStrategy(HBaseQueueClientFactory.this.hBaseTableUtil, distributorBuckets) : new ShardedHBaseQueueStrategy(HBaseQueueClientFactory.this.hBaseTableUtil, distributorBuckets)));
                    }
                    return newArrayList;
                } finally {
                }
            } catch (Throwable th3) {
                if (consumerStateStore != null) {
                    if (th != null) {
                        try {
                            consumerStateStore.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        consumerStateStore.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueClientFactory$SmartQueueConsumer.class */
    private final class SmartQueueConsumer extends ForwardingTransactionAware implements QueueConsumer {
        private final QueueName queueName;
        private final ConsumerConfig consumerConfig;
        private final Callable<? extends Iterable<HBaseQueueConsumer>> consumerCreator;
        private final Deque<HBaseQueueConsumer> consumers;

        private SmartQueueConsumer(QueueName queueName, ConsumerConfig consumerConfig, Callable<? extends Iterable<HBaseQueueConsumer>> callable) throws Exception {
            this.queueName = queueName;
            this.consumerConfig = consumerConfig;
            this.consumers = Lists.newLinkedList(callable.call());
            this.consumerCreator = callable;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // co.cask.cdap.data2.transaction.ForwardingTransactionAware
        /* renamed from: delegate */
        public TransactionAware mo172delegate() {
            return this.consumers.peek();
        }

        @Override // co.cask.cdap.data2.queue.QueueConsumer
        public QueueName getQueueName() {
            return this.queueName;
        }

        @Override // co.cask.cdap.data2.queue.QueueConsumer
        public ConsumerConfig getConfig() {
            return this.consumerConfig;
        }

        @Override // co.cask.cdap.data2.queue.QueueConsumer
        public DequeueResult<byte[]> dequeue() throws IOException {
            return dequeue(1);
        }

        @Override // co.cask.cdap.data2.queue.QueueConsumer
        public DequeueResult<byte[]> dequeue(int i) throws IOException {
            return this.consumers.peek().dequeue(i);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            Iterator<HBaseQueueConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                Closeables.closeQuietly(it.next());
            }
        }

        @Override // co.cask.cdap.data2.transaction.ForwardingTransactionAware
        public void startTx(Transaction transaction) {
            if (this.consumers.isEmpty()) {
                updateConsumers();
            }
            super.startTx(transaction);
        }

        @Override // co.cask.cdap.data2.transaction.ForwardingTransactionAware
        public void postTxCommit() {
            super.postTxCommit();
            HBaseQueueConsumer poll = this.consumers.poll();
            if (!poll.isClosed()) {
                this.consumers.add(poll);
            }
            if (this.consumers.isEmpty()) {
                updateConsumers();
            }
        }

        private void updateConsumers() {
            try {
                Iterables.addAll(this.consumers, this.consumerCreator.call());
            } catch (Exception e) {
                HBaseQueueClientFactory.LOG.error("Failed to update consumer", e);
                throw Throwables.propagate(e);
            }
        }
    }

    @Inject
    public HBaseQueueClientFactory(CConfiguration cConfiguration, Configuration configuration, HBaseTableUtil hBaseTableUtil, QueueAdmin queueAdmin, TransactionExecutorFactory transactionExecutorFactory) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.queueAdmin = (HBaseQueueAdmin) queueAdmin;
        this.hBaseTableUtil = hBaseTableUtil;
        this.txExecutorFactory = transactionExecutorFactory;
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueProducer createProducer(QueueName queueName) throws IOException {
        return createProducer(queueName, QueueMetrics.NOOP_QUEUE_METRICS);
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueProducer createProducer(QueueName queueName, QueueMetrics queueMetrics) throws IOException {
        HBaseQueueAdmin ensureTableExists = ensureTableExists(queueName);
        try {
            final ArrayList newArrayList = Lists.newArrayList();
            final HBaseConsumerStateStore consumerStateStore = ensureTableExists.getConsumerStateStore(queueName);
            Throwable th = null;
            try {
                try {
                    Transactions.createTransactionExecutor(this.txExecutorFactory, consumerStateStore).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueClientFactory.1
                        public void apply() throws Exception {
                            consumerStateStore.getLatestConsumerGroups(newArrayList);
                        }
                    });
                    if (consumerStateStore != null) {
                        if (0 != 0) {
                            try {
                                consumerStateStore.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            consumerStateStore.close();
                        }
                    }
                    Preconditions.checkState(!newArrayList.isEmpty(), "Missing consumer group information for queue %s", new Object[]{queueName});
                    HTable createHTable = createHTable(ensureTableExists.getDataTableId(queueName, this.queueAdmin.getType()));
                    return createProducer(createHTable, queueName, queueMetrics, new ShardedHBaseQueueStrategy(this.hBaseTableUtil, getDistributorBuckets(createHTable.getTableDescriptor())), newArrayList);
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new IOException(e);
        }
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueConsumer createConsumer(QueueName queueName, ConsumerConfig consumerConfig, int i) throws IOException {
        try {
            return new SmartQueueConsumer(queueName, consumerConfig, new AnonymousClass2(ensureTableExists(queueName), queueName, consumerConfig.getGroupId(), consumerConfig));
        } catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new IOException(e);
        }
    }

    @VisibleForTesting
    HBaseQueueProducer createProducer(HBaseQueueAdmin hBaseQueueAdmin, QueueName queueName, QueueConstants.QueueType queueType, QueueMetrics queueMetrics, HBaseQueueStrategy hBaseQueueStrategy, Iterable<? extends ConsumerGroupConfig> iterable) throws IOException {
        return createProducer(createHTable(hBaseQueueAdmin.getDataTableId(queueName, queueType)), queueName, queueMetrics, hBaseQueueStrategy, iterable);
    }

    private HBaseQueueProducer createProducer(HTable hTable, QueueName queueName, QueueMetrics queueMetrics, HBaseQueueStrategy hBaseQueueStrategy, Iterable<? extends ConsumerGroupConfig> iterable) throws IOException {
        return new HBaseQueueProducer(hTable, queueName, queueMetrics, hBaseQueueStrategy, iterable);
    }

    private HBaseQueueAdmin ensureTableExists(QueueName queueName) throws IOException {
        try {
            if (!this.queueAdmin.exists(queueName)) {
                this.queueAdmin.create(queueName);
            }
            return this.queueAdmin;
        } catch (Exception e) {
            throw new IOException("Failed to open table " + this.queueAdmin.getDataTableId(queueName), e);
        }
    }

    public HBaseQueueAdmin getQueueAdmin() {
        return this.queueAdmin;
    }

    public HTable createHTable(TableId tableId) throws IOException {
        HTable createHTable = this.hBaseTableUtil.createHTable(this.hConf, tableId);
        createHTable.setWriteBufferSize(4194304L);
        createHTable.setAutoFlush(false);
        return createHTable;
    }

    public int getDistributorBuckets(HTableDescriptor hTableDescriptor) {
        String value = hTableDescriptor.getValue(QueueConstants.DISTRIBUTOR_BUCKETS);
        if (value == null) {
            return 16;
        }
        return Integer.parseInt(value);
    }
}
