package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.utils.Networks;
import co.cask.cdap.data.hbase.HBaseTestBase;
import co.cask.cdap.data.hbase.HBaseTestFactory;
import co.cask.cdap.data.runtime.DataFabricDistributedModule;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.runtime.SystemDatasetRuntimeModule;
import co.cask.cdap.data.runtime.TransactionMetricsModule;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.queue.DequeueResult;
import co.cask.cdap.data2.queue.DequeueStrategy;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueConfigurer;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import co.cask.cdap.data2.transaction.queue.QueueTest;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.CConfigurationReader;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.ConsumerConfigCache;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.ConfigurationTable;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HTableNameConverter;
import co.cask.cdap.data2.util.hbase.HTableNameConverterFactory;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import co.cask.cdap.notifications.feeds.service.NoOpNotificationFeedManager;
import co.cask.cdap.proto.Id;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionSystemClient;
import co.cask.tephra.distributed.TransactionService;
import co.cask.tephra.persist.TransactionSnapshot;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Scopes;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueTest.class */
public abstract class HBaseQueueTest extends QueueTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueTest.class);

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static TransactionService txService;
    private static CConfiguration cConf;
    private static Configuration hConf;
    private static Injector injector;
    protected static HBaseTestBase testHBase;
    protected static HBaseTableUtil tableUtil;
    private static ZKClientService zkClientService;

    @BeforeClass
    public static void init() throws Exception {
        testHBase = (HBaseTestBase) new HBaseTestFactory().get();
        testHBase.startHBase();
        hConf = testHBase.getConfiguration();
        cConf = CConfiguration.create();
        cConf.set("zookeeper.quorum", testHBase.getZkConnectionString());
        cConf.set("data.tx.bind.port", Integer.toString(Networks.getRandomPort()));
        cConf.set("dataset.table.prefix", "test");
        cConf.set("hdfs.user", System.getProperty("user.name"));
        cConf.setLong("data.queue.config.update.interval", 1L);
        cConf.setLong("data.tx.timeout", 100000000L);
        injector = Guice.createInjector(new Module[]{new DataFabricDistributedModule(), new ConfigModule(cConf, hConf), new ZKClientModule(), new LocationRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new TransactionMetricsModule(), new DataSetsModules().getInMemoryModules(), new SystemDatasetRuntimeModule().getDistributedModules(), new AbstractModule() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.1
            protected void configure() {
                bind(NotificationFeedManager.class).to(NoOpNotificationFeedManager.class).in(Scopes.SINGLETON);
            }
        }});
        tableUtil = (HBaseTableUtil) injector.getInstance(HBaseTableUtil.class);
        tableUtil.createNamespaceIfNotExists(testHBase.getHBaseAdmin(), Constants.SYSTEM_NAMESPACE_ID);
        tableUtil.createNamespaceIfNotExists(testHBase.getHBaseAdmin(), NAMESPACE_ID);
        tableUtil.createNamespaceIfNotExists(testHBase.getHBaseAdmin(), NAMESPACE_ID1);
        new ConfigurationTable(hConf).write(ConfigurationTable.Type.DEFAULT, cConf);
        zkClientService = (ZKClientService) injector.getInstance(ZKClientService.class);
        zkClientService.startAndWait();
        txService = (TransactionService) injector.getInstance(TransactionService.class);
        new Thread() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                HBaseQueueTest.txService.start();
            }
        }.start();
        transactionManager = (TransactionManager) injector.getInstance(TransactionManager.class);
        txSystemClient = (TransactionSystemClient) injector.getInstance(TransactionSystemClient.class);
        queueClientFactory = (QueueClientFactory) injector.getInstance(QueueClientFactory.class);
        queueAdmin = (QueueAdmin) injector.getInstance(QueueAdmin.class);
        executorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
    }

    @Test
    public void testQueueTableNameFormat() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("default", "application1", "flow1", "flowlet1", "output1");
        HBaseQueueAdmin hBaseQueueAdmin = queueAdmin;
        TableId dataTableId = hBaseQueueAdmin.getDataTableId(fromFlowlet);
        Assert.assertEquals(Constants.DEFAULT_NAMESPACE_ID, dataTableId.getNamespace());
        Assert.assertEquals("system." + hBaseQueueAdmin.getType() + ".application1.flow1", dataTableId.getTableName());
        String nameAsString = tableUtil.createHTableDescriptor(dataTableId).getNameAsString();
        Assert.assertEquals("application1", HBaseQueueAdmin.getApplicationName(nameAsString));
        Assert.assertEquals("flow1", HBaseQueueAdmin.getFlowName(nameAsString));
        TableId dataTableId2 = hBaseQueueAdmin.getDataTableId(QueueName.fromFlowlet("testNamespace", "application1", "flow1", "flowlet1", "output1"));
        Assert.assertEquals(Id.Namespace.from("testNamespace"), dataTableId2.getNamespace());
        Assert.assertEquals("system." + hBaseQueueAdmin.getType() + ".application1.flow1", dataTableId2.getTableName());
        String nameAsString2 = tableUtil.createHTableDescriptor(dataTableId2).getNameAsString();
        Assert.assertEquals("application1", HBaseQueueAdmin.getApplicationName(nameAsString2));
        Assert.assertEquals("flow1", HBaseQueueAdmin.getFlowName(nameAsString2));
    }

    @Test
    public void testHTablePreSplitted() throws Exception {
        testHTablePreSplitted((HBaseQueueAdmin) queueAdmin, QueueName.fromFlowlet("default", "app", "flow", "flowlet", "out"));
    }

    void testHTablePreSplitted(HBaseQueueAdmin hBaseQueueAdmin, QueueName queueName) throws Exception {
        TableId dataTableId = hBaseQueueAdmin.getDataTableId(queueName);
        if (!hBaseQueueAdmin.exists(queueName)) {
            hBaseQueueAdmin.create(queueName);
        }
        Assert.assertEquals("Failed for " + hBaseQueueAdmin.getClass().getName(), cConf.getInt("data.queue.table.presplits"), tableUtil.createHTable(testHBase.getConfiguration(), dataTableId).getRegionsInRange(new byte[]{0}, new byte[]{-1}).size());
    }

    @Test
    public void configTest() throws Exception {
        final QueueName fromFlowlet = QueueName.fromFlowlet("default", "app", "flow", "flowlet", "configure");
        queueAdmin.create(fromFlowlet);
        final ImmutableList of = ImmutableList.of(new ConsumerGroupConfig(1L, 1, DequeueStrategy.FIFO, (String) null), new ConsumerGroupConfig(2L, 2, DequeueStrategy.FIFO, (String) null), new ConsumerGroupConfig(3L, 3, DequeueStrategy.FIFO, (String) null));
        final HBaseConsumerStateStore consumerStateStore = queueAdmin.getConsumerStateStore(fromFlowlet);
        try {
            TransactionExecutor createTransactionExecutor = Transactions.createTransactionExecutor(executorFactory, consumerStateStore);
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.3
                public void apply() throws Exception {
                    consumerStateStore.updateState(2L, 0, QueueEntryRow.getQueueEntryRowKey(fromFlowlet, 10L, 0));
                }
            });
            configureGroups(fromFlowlet, of);
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.4
                public void apply() throws Exception {
                    for (ConsumerGroupConfig consumerGroupConfig : of) {
                        long groupId = consumerGroupConfig.getGroupId();
                        List allBarriers = consumerStateStore.getAllBarriers(groupId);
                        Assert.assertEquals(1L, allBarriers.size());
                        for (int i = 0; i < consumerGroupConfig.getGroupSize(); i++) {
                            HBaseConsumerState state = consumerStateStore.getState(groupId, i);
                            if (groupId == 2 && i == 0) {
                                Assert.assertEquals(0L, Bytes.compareTo(state.getStartRow(), QueueEntryRow.getQueueEntryRowKey(fromFlowlet, 10L, 0)));
                                Assert.assertEquals(0L, Bytes.compareTo(state.getNextBarrier(), ((QueueBarrier) allBarriers.get(0)).getStartRow()));
                            } else {
                                Assert.assertEquals(0L, Bytes.compareTo(state.getStartRow(), ((QueueBarrier) allBarriers.get(0)).getStartRow()));
                            }
                        }
                    }
                }
            });
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.5
                public void apply() throws Exception {
                    long j = 1;
                    while (true) {
                        long j2 = j;
                        if (j2 > 3) {
                            return;
                        }
                        boolean isAllConsumed = consumerStateStore.isAllConsumed(j2, ((QueueBarrier) consumerStateStore.getAllBarriers(j2).get(0)).getStartRow());
                        Assert.assertTrue(j2 == 2 ? !isAllConsumed : isAllConsumed);
                        if (j2 == 2) {
                            consumerStateStore.completed(j2, 0);
                        }
                        j = j2 + 1;
                    }
                }
            });
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.6
                public void apply() throws Exception {
                    List allBarriers = consumerStateStore.getAllBarriers(2L);
                    byte[] startRow = consumerStateStore.getState(2L, 0).getStartRow();
                    Assert.assertEquals(0L, Bytes.compareTo(startRow, ((QueueBarrier) allBarriers.get(0)).getStartRow()));
                    Assert.assertTrue(consumerStateStore.isAllConsumed(2L, startRow));
                }
            });
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.7
                public void apply() throws Exception {
                    consumerStateStore.configureInstances(2L, 3);
                }
            });
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.8
                public void apply() throws Exception {
                    List allBarriers = consumerStateStore.getAllBarriers(2L);
                    Assert.assertEquals(2L, allBarriers.size());
                    for (int i = 0; i < 2; i++) {
                        HBaseConsumerState state = consumerStateStore.getState(2L, i);
                        Assert.assertEquals(0L, Bytes.compareTo(state.getStartRow(), ((QueueBarrier) allBarriers.get(0)).getStartRow()));
                        Assert.assertEquals(0L, Bytes.compareTo(state.getNextBarrier(), ((QueueBarrier) allBarriers.get(1)).getStartRow()));
                        consumerStateStore.completed(2L, i);
                    }
                    HBaseConsumerState state2 = consumerStateStore.getState(2L, 2);
                    Assert.assertEquals(0L, Bytes.compareTo(state2.getStartRow(), ((QueueBarrier) allBarriers.get(1)).getStartRow()));
                    Assert.assertNull(state2.getNextBarrier());
                    Assert.assertTrue(consumerStateStore.isAllConsumed(2L, ((QueueBarrier) allBarriers.get(1)).getStartRow()));
                }
            });
            configureGroups(fromFlowlet, ImmutableList.of(new ConsumerGroupConfig(2L, 1, DequeueStrategy.FIFO, (String) null), new ConsumerGroupConfig(4L, 1, DequeueStrategy.FIFO, (String) null)));
            createTransactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.9
                public void apply() throws Exception {
                    try {
                        Assert.assertTrue(consumerStateStore.getAllBarriers(1L).isEmpty());
                        consumerStateStore.getState(1L, 0);
                        Assert.fail("Not expected to get state for group 1");
                    } catch (Exception e) {
                    }
                    try {
                        Assert.assertTrue(consumerStateStore.getAllBarriers(3L).isEmpty());
                        consumerStateStore.getState(3L, 0);
                        Assert.fail("Not expected to get state for group 3");
                    } catch (Exception e2) {
                    }
                    List allBarriers = consumerStateStore.getAllBarriers(2L);
                    Assert.assertEquals(2L, allBarriers.size());
                    for (int i = 0; i < 3; i++) {
                        consumerStateStore.completed(2L, i);
                    }
                    HBaseConsumerState state = consumerStateStore.getState(2L, 0);
                    Assert.assertEquals(0L, Bytes.compareTo(state.getStartRow(), ((QueueBarrier) allBarriers.get(1)).getStartRow()));
                    Assert.assertNull(state.getNextBarrier());
                    for (int i2 = 1; i2 < 3; i2++) {
                        try {
                            consumerStateStore.getState(2L, i2);
                            Assert.fail("Not expected to get state for group 2, instance " + i2);
                        } catch (Exception e3) {
                        }
                    }
                }
            });
            consumerStateStore.close();
            queueAdmin.dropAllInNamespace("default");
        } catch (Throwable th) {
            consumerStateStore.close();
            queueAdmin.dropAllInNamespace("default");
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testQueueUpgrade() throws Exception {
        final QueueName fromFlowlet = QueueName.fromFlowlet("default", "app", "flow", "flowlet", "upgrade");
        HBaseQueueAdmin hBaseQueueAdmin = queueAdmin;
        HBaseQueueClientFactory hBaseQueueClientFactory = queueClientFactory;
        HBaseQueueAdmin hBaseQueueAdmin2 = new HBaseQueueAdmin(hConf, cConf, (LocationFactory) injector.getInstance(LocationFactory.class), (HBaseTableUtil) injector.getInstance(HBaseTableUtil.class), (DatasetFramework) injector.getInstance(DatasetFramework.class), (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class), QueueConstants.QueueType.QUEUE);
        hBaseQueueAdmin2.create(fromFlowlet);
        final HBaseQueueProducer createProducer = hBaseQueueClientFactory.createProducer(hBaseQueueAdmin2, fromFlowlet, QueueConstants.QueueType.QUEUE, QueueMetrics.NOOP_QUEUE_METRICS, new SaltedHBaseQueueStrategy(cConf.getInt("data.queue.table.presplits")), ImmutableList.of());
        try {
            Transactions.createTransactionExecutor(executorFactory, createProducer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.10
                public void apply() throws Exception {
                    for (int i = 0; i < 10; i++) {
                        createProducer.enqueue(new QueueEntry("key", i, Bytes.toBytes("Message " + i)));
                    }
                }
            });
            createProducer.close();
            final ConsumerConfig consumerConfig = new ConsumerConfig(0L, 0, 1, DequeueStrategy.HASH, "key");
            final QueueConfigurer queueConfigurer = queueAdmin.getQueueConfigurer(fromFlowlet);
            try {
                Transactions.createTransactionExecutor(executorFactory, queueConfigurer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.11
                    public void apply() throws Exception {
                        queueConfigurer.configureGroups(ImmutableList.of(consumerConfig));
                    }
                });
                queueConfigurer.close();
                final HBaseConsumerStateStore consumerStateStore = hBaseQueueAdmin.getConsumerStateStore(fromFlowlet);
                try {
                    Transactions.createTransactionExecutor(executorFactory, consumerStateStore).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.12
                        public void apply() throws Exception {
                            consumerStateStore.updateState(consumerConfig.getGroupId(), consumerConfig.getInstanceId(), QueueEntryRow.getQueueEntryRowKey(fromFlowlet, 0L, 0));
                        }
                    });
                    consumerStateStore.close();
                    createEnqueueRunnable(fromFlowlet, 10, 1, null).run();
                    Assert.assertEquals(10L, countRows(hBaseQueueAdmin.getDataTableId(fromFlowlet, QueueConstants.QueueType.QUEUE)));
                    Assert.assertEquals(10L, countRows(hBaseQueueAdmin.getDataTableId(fromFlowlet, QueueConstants.QueueType.SHARDED_QUEUE)));
                    final ArrayList newArrayList = Lists.newArrayList();
                    final TransactionAware createConsumer = queueClientFactory.createConsumer(fromFlowlet, consumerConfig, 1);
                    while (newArrayList.size() != 20) {
                        try {
                            Transactions.createTransactionExecutor(executorFactory, createConsumer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.13
                                public void apply() throws Exception {
                                    Iterator it = createConsumer.dequeue(20).iterator();
                                    while (it.hasNext()) {
                                        newArrayList.add(Bytes.toString((byte[]) it.next()));
                                    }
                                }
                            });
                        } finally {
                        }
                    }
                    queueConfigurer.close();
                    verifyQueueIsEmpty(fromFlowlet, ImmutableList.of(consumerConfig));
                } catch (Throwable th) {
                    consumerStateStore.close();
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            createProducer.close();
            throw th2;
        }
    }

    @Test(timeout = 30000)
    public void testReconfigure() throws Exception {
        final TransactionAware createConsumer;
        QueueName fromFlowlet = QueueName.fromFlowlet("default", "app", "flow", "flowlet", "changeinstances");
        ConsumerGroupConfig consumerGroupConfig = new ConsumerGroupConfig(0L, 2, DequeueStrategy.HASH, "key");
        configureGroups(fromFlowlet, ImmutableList.of(consumerGroupConfig));
        createEnqueueRunnable(fromFlowlet, 10, 1, null).run();
        final ArrayListMultimap create = ArrayListMultimap.create();
        for (int i = 0; i < consumerGroupConfig.getGroupSize(); i++) {
            final ConsumerConfig consumerConfig = new ConsumerConfig(consumerGroupConfig, i);
            createConsumer = queueClientFactory.createConsumer(fromFlowlet, consumerConfig, 1);
            try {
                Transactions.createTransactionExecutor(executorFactory, createConsumer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.14
                    public void apply() throws Exception {
                        DequeueResult dequeue = createConsumer.dequeue(2);
                        Assert.assertEquals(2L, dequeue.size());
                        Iterator it = dequeue.iterator();
                        while (it.hasNext()) {
                            create.put(Integer.valueOf(consumerConfig.getInstanceId()), Integer.valueOf(Bytes.toInt((byte[]) it.next())));
                        }
                    }
                });
                createConsumer.close();
            } finally {
            }
        }
        changeInstances(fromFlowlet, 0L, 3);
        createEnqueueRunnable(fromFlowlet, 10, 1, null).run();
        ConsumerGroupConfig consumerGroupConfig2 = new ConsumerGroupConfig(0L, 3, DequeueStrategy.HASH, "key");
        while (create.size() != 20) {
            for (int i2 = 0; i2 < consumerGroupConfig2.getGroupSize(); i2++) {
                final ConsumerConfig consumerConfig2 = new ConsumerConfig(consumerGroupConfig2, i2);
                final TransactionAware createConsumer2 = queueClientFactory.createConsumer(fromFlowlet, consumerConfig2, 1);
                try {
                    Transactions.createTransactionExecutor(executorFactory, createConsumer2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.15
                        public void apply() throws Exception {
                            Iterator it = createConsumer2.dequeue(20).iterator();
                            while (it.hasNext()) {
                                create.put(Integer.valueOf(consumerConfig2.getInstanceId()), Integer.valueOf(Bytes.toInt((byte[]) it.next())));
                            }
                        }
                    });
                    createConsumer2.close();
                } finally {
                }
            }
        }
        Assert.assertEquals(ImmutableList.of(0, 2, 4, 6, 8, 0, 3, 6, 9), create.get(0));
        Assert.assertEquals(ImmutableList.of(1, 3, 5, 7, 9, 1, 4, 7), create.get(1));
        Assert.assertEquals(ImmutableList.of(2, 5, 8), create.get(2));
        for (int i3 = 0; i3 < consumerGroupConfig2.getGroupSize(); i3++) {
            createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(consumerGroupConfig2, i3), 1);
            try {
                Transactions.createTransactionExecutor(executorFactory, createConsumer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.16
                    public void apply() throws Exception {
                        Assert.assertTrue(createConsumer.dequeue(20).isEmpty());
                    }
                });
                createConsumer.close();
            } finally {
                createConsumer.close();
            }
        }
        createEnqueueRunnable(fromFlowlet, 6, 1, null).run();
        changeInstances(fromFlowlet, 0L, 1);
        create.clear();
        final ConsumerConfig consumerConfig3 = new ConsumerConfig(0L, 0, 1, DequeueStrategy.HASH, "key");
        final TransactionAware createConsumer3 = queueClientFactory.createConsumer(fromFlowlet, consumerConfig3, 1);
        while (create.size() != 6) {
            Transactions.createTransactionExecutor(executorFactory, createConsumer3).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.17
                public void apply() throws Exception {
                    Iterator it = createConsumer3.dequeue(1).iterator();
                    while (it.hasNext()) {
                        create.put(Integer.valueOf(consumerConfig3.getInstanceId()), Integer.valueOf(Bytes.toInt((byte[]) it.next())));
                    }
                }
            });
        }
        Assert.assertEquals(ImmutableList.of(0, 1, 2, 3, 4, 5), create.get(0));
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void verifyConsumerConfigExists(QueueName... queueNameArr) throws Exception {
        for (QueueName queueName : queueNameArr) {
            ConsumerConfigCache consumerConfigCache = getConsumerConfigCache(queueName);
            consumerConfigCache.updateCache();
            Assert.assertNotNull("for " + queueName, consumerConfigCache.getConsumerConfig(queueName.toBytes()));
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void verifyConsumerConfigIsDeleted(QueueName... queueNameArr) throws Exception {
        for (QueueName queueName : queueNameArr) {
            try {
                ConsumerConfigCache consumerConfigCache = getConsumerConfigCache(queueName);
                consumerConfigCache.updateCache();
                Assert.assertNull("for " + queueName, consumerConfigCache.getConsumerConfig(queueName.toBytes()));
            } catch (TableNotFoundException e) {
            }
        }
    }

    private int countRows(TableId tableId) throws Exception {
        HTable createHTable = tableUtil.createHTable(hConf, tableId);
        try {
            ResultScanner scanner = createHTable.getScanner(QueueEntryRow.COLUMN_FAMILY);
            try {
                int size = Iterables.size(scanner);
                scanner.close();
                createHTable.close();
                return size;
            } catch (Throwable th) {
                scanner.close();
                throw th;
            }
        } catch (Throwable th2) {
            createHTable.close();
            throw th2;
        }
    }

    private ConsumerConfigCache getConsumerConfigCache(QueueName queueName) throws Exception {
        HTableDescriptor tableDescriptor = tableUtil.createHTable(hConf, HBaseQueueAdmin.getConfigTableId(queueName)).getTableDescriptor();
        return ConsumerConfigCache.getInstance(hConf, Bytes.toBytes(tableDescriptor.getNameAsString()), new CConfigurationReader(hConf, ((HTableNameConverter) new HTableNameConverterFactory().get()).getSysConfigTablePrefix(tableDescriptor)), new Supplier<TransactionSnapshot>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.18
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public TransactionSnapshot m43get() {
                try {
                    return HBaseQueueTest.transactionManager.getSnapshot();
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        });
    }

    private void takeTxSnapshot() throws Exception {
        Method declaredMethod = transactionManager.getClass().getDeclaredMethod("doSnapshot", Boolean.TYPE);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(transactionManager, false);
        LOG.info("Read pointer: {}", Long.valueOf(transactionManager.getCurrentState().getReadPointer()));
        LOG.info("Snapshot read pointer: {}", Long.valueOf(transactionManager.getSnapshot().getReadPointer()));
    }

    @AfterClass
    public static void finish() throws Exception {
        tableUtil.deleteAllInNamespace(testHBase.getHBaseAdmin(), NAMESPACE_ID);
        tableUtil.deleteNamespaceIfExists(testHBase.getHBaseAdmin(), NAMESPACE_ID);
        tableUtil.deleteAllInNamespace(testHBase.getHBaseAdmin(), NAMESPACE_ID1);
        tableUtil.deleteNamespaceIfExists(testHBase.getHBaseAdmin(), NAMESPACE_ID1);
        txService.stop();
        testHBase.stopHBase();
        zkClientService.stopAndWait();
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void forceEviction(QueueName queueName, int i) throws Exception {
        byte[] name = tableUtil.getHTableDescriptor(testHBase.getHBaseAdmin(), queueAdmin.getDataTableId(queueName)).getName();
        takeTxSnapshot();
        final Class queueRegionObserverClassForVersion = tableUtil.getQueueRegionObserverClassForVersion();
        testHBase.forEachRegion(name, new Function<HRegion, Object>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.19
            public Object apply(HRegion hRegion) {
                try {
                    Coprocessor findCoprocessor = hRegion.getCoprocessorHost().findCoprocessor(queueRegionObserverClassForVersion.getName());
                    HBaseQueueTest.LOG.info("forcing update of transaction state cache for HBaseQueueRegionObserver of region: {}", hRegion);
                    Method declaredMethod = findCoprocessor.getClass().getDeclaredMethod("getTxStateCache", new Class[0]);
                    declaredMethod.setAccessible(true);
                    Object invoke = declaredMethod.invoke(findCoprocessor, new Object[0]);
                    Method declaredMethod2 = invoke.getClass().getSuperclass().getDeclaredMethod("refreshState", new Class[0]);
                    declaredMethod2.setAccessible(true);
                    declaredMethod2.invoke(invoke, new Object[0]);
                    HBaseQueueTest.LOG.info("forcing update cache for HBaseQueueRegionObserver of region: {}", hRegion);
                    Method declaredMethod3 = findCoprocessor.getClass().getDeclaredMethod("getConfigCache", new Class[0]);
                    declaredMethod3.setAccessible(true);
                    Object invoke2 = declaredMethod3.invoke(findCoprocessor, new Object[0]);
                    Method declaredMethod4 = invoke2.getClass().getDeclaredMethod("updateCache", new Class[0]);
                    declaredMethod4.setAccessible(true);
                    declaredMethod4.invoke(invoke2, new Object[0]);
                    return null;
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        });
        testHBase.forceRegionFlush(name);
        testHBase.forceRegionCompact(name, true);
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void configureGroups(QueueName queueName, final Iterable<? extends ConsumerGroupConfig> iterable) throws Exception {
        Preconditions.checkArgument(queueName.isQueue(), "Only support queue configuration in queue test.");
        final QueueConfigurer queueConfigurer = queueAdmin.getQueueConfigurer(queueName);
        try {
            Transactions.createTransactionExecutor(executorFactory, queueConfigurer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.20
                public void apply() throws Exception {
                    queueConfigurer.configureGroups(iterable);
                }
            });
            Closeables.closeQuietly(queueConfigurer);
        } catch (Throwable th) {
            Closeables.closeQuietly(queueConfigurer);
            throw th;
        }
    }

    private void changeInstances(QueueName queueName, final long j, final int i) throws Exception {
        Preconditions.checkArgument(queueName.isQueue(), "Only support queue configuration in queue test.");
        final QueueConfigurer queueConfigurer = queueAdmin.getQueueConfigurer(queueName);
        try {
            Transactions.createTransactionExecutor(executorFactory, queueConfigurer).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.21
                public void apply() throws Exception {
                    queueConfigurer.configureInstances(j, i);
                }
            });
            Closeables.closeQuietly(queueConfigurer);
        } catch (Throwable th) {
            Closeables.closeQuietly(queueConfigurer);
            throw th;
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void resetConsumerState(QueueName queueName, final ConsumerConfig consumerConfig) throws Exception {
        final HBaseConsumerStateStore consumerStateStore = queueAdmin.getConsumerStateStore(queueName);
        try {
            Transactions.createTransactionExecutor(executorFactory, consumerStateStore).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.22
                public void apply() throws Exception {
                    consumerStateStore.updateState(consumerConfig.getGroupId(), consumerConfig.getInstanceId(), ((QueueBarrier) consumerStateStore.getAllBarriers(consumerConfig.getGroupId()).get(0)).getStartRow());
                }
            });
            Closeables.closeQuietly(consumerStateStore);
        } catch (Throwable th) {
            Closeables.closeQuietly(consumerStateStore);
            throw th;
        }
    }
}
