package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.utils.Networks;
import co.cask.cdap.data.hbase.HBaseTestBase;
import co.cask.cdap.data.hbase.HBaseTestFactory;
import co.cask.cdap.data.runtime.DataFabricDistributedModule;
import co.cask.cdap.data.runtime.TransactionMetricsModule;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueTest;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.ConsumerConfigCache;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.util.hbase.ConfigurationTable;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionSystemClient;
import co.cask.tephra.distributed.TransactionService;
import co.cask.tephra.persist.NoOpTransactionStateStorage;
import co.cask.tephra.persist.TransactionStateStorage;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.twill.filesystem.LocalLocationFactory;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueTest.class */
public abstract class HBaseQueueTest extends QueueTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueTest.class);

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static TransactionService txService;
    private static CConfiguration cConf;
    private static Configuration hConf;
    private static ConsumerConfigCache configCache;
    private static HBaseTestBase testHBase;
    private static HBaseTableUtil tableUtil;
    private static ZKClientService zkClientService;

    @BeforeClass
    public static void init() throws Exception {
        testHBase = (HBaseTestBase) new HBaseTestFactory().get();
        testHBase.startHBase();
        hConf = testHBase.getConfiguration();
        cConf = CConfiguration.create();
        cConf.set("zookeeper.quorum", testHBase.getZkConnectionString());
        cConf.set("data.tx.bind.port", Integer.toString(Networks.getRandomPort()));
        cConf.set("dataset.table.prefix", "test");
        cConf.setBoolean("tx.persist", false);
        cConf.set("hdfs.user", System.getProperty("user.name"));
        cConf.setLong("data.queue.config.update.interval", 1L);
        Module with = Modules.override(new Module[]{new DataFabricDistributedModule()}).with(new Module[]{new AbstractModule() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.1
            protected void configure() {
                bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class);
            }
        }});
        new ConfigurationTable(hConf).write(ConfigurationTable.Type.DEFAULT, cConf);
        Injector createInjector = Guice.createInjector(new Module[]{with, new ConfigModule(cConf, hConf), new ZKClientModule(), new DiscoveryRuntimeModule().getDistributedModules(), new TransactionMetricsModule(), new AbstractModule() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.2
            protected void configure() {
                try {
                    bind(LocationFactory.class).toInstance(new LocalLocationFactory(HBaseQueueTest.tmpFolder.newFolder()));
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        }});
        zkClientService = (ZKClientService) createInjector.getInstance(ZKClientService.class);
        zkClientService.startAndWait();
        txService = (TransactionService) createInjector.getInstance(TransactionService.class);
        new Thread() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                HBaseQueueTest.txService.start();
            }
        }.start();
        txSystemClient = (TransactionSystemClient) createInjector.getInstance(TransactionSystemClient.class);
        queueClientFactory = (QueueClientFactory) createInjector.getInstance(QueueClientFactory.class);
        queueAdmin = (QueueAdmin) createInjector.getInstance(QueueAdmin.class);
        streamAdmin = (StreamAdmin) createInjector.getInstance(StreamAdmin.class);
        executorFactory = (TransactionExecutorFactory) createInjector.getInstance(TransactionExecutorFactory.class);
        configCache = ConsumerConfigCache.getInstance(hConf, Bytes.toBytes(queueAdmin.getConfigTableName()));
        tableUtil = (HBaseTableUtil) new HBaseTableUtilFactory().get();
    }

    @Test
    public void testQueueTableNameFormat() throws Exception {
        String actualTableName = queueAdmin.getActualTableName(QueueName.fromFlowlet("application1", "flow1", "flowlet1", "output1"));
        Assert.assertEquals("application1", HBaseQueueAdmin.getApplicationName(actualTableName));
        Assert.assertEquals("flow1", HBaseQueueAdmin.getFlowName(actualTableName));
    }

    @Test
    public void testHTablePreSplitted() throws Exception {
        testHTablePreSplitted((HBaseQueueAdmin) queueAdmin, QueueName.fromFlowlet("app", "flow", "flowlet", "out"));
    }

    void testHTablePreSplitted(HBaseQueueAdmin hBaseQueueAdmin, QueueName queueName) throws Exception {
        String actualTableName = hBaseQueueAdmin.getActualTableName(queueName);
        if (!hBaseQueueAdmin.exists(queueName.toString())) {
            hBaseQueueAdmin.create(queueName.toString());
        }
        Assert.assertEquals("Failed for " + hBaseQueueAdmin.getClass().getName(), 16L, testHBase.getHTable(Bytes.toBytes(actualTableName)).getRegionsInRange(new byte[]{0}, new byte[]{-1}).size());
    }

    @Test
    public void configTest() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "out");
        String configTableName = queueClientFactory.getConfigTableName(fromFlowlet);
        queueAdmin.configureGroups(fromFlowlet, ImmutableMap.of(1L, 1, 2L, 2, 3L, 3));
        HTable hTable = testHBase.getHTable(Bytes.toBytes(configTableName));
        try {
            byte[] bytes = fromFlowlet.toBytes();
            Assert.assertEquals(6L, hTable.get(new Get(bytes)).getFamilyMap(QueueEntryRow.COLUMN_FAMILY).size());
            Put put = new Put(bytes);
            put.add(QueueEntryRow.COLUMN_FAMILY, HBaseQueueAdmin.getConsumerStateColumn(2L, 0), Bytes.toBytes(4));
            put.add(QueueEntryRow.COLUMN_FAMILY, HBaseQueueAdmin.getConsumerStateColumn(2L, 1), Bytes.toBytes(5));
            hTable.put(put);
            queueAdmin.configureInstances(fromFlowlet, 2L, 3);
            Result result = hTable.get(new Get(bytes));
            for (int i = 0; i < 3; i++) {
                Assert.assertEquals(4L, Bytes.toInt(result.getColumnLatest(QueueEntryRow.COLUMN_FAMILY, HBaseQueueAdmin.getConsumerStateColumn(2L, i)).getValue()));
            }
            Put put2 = new Put(bytes);
            put2.add(QueueEntryRow.COLUMN_FAMILY, HBaseQueueAdmin.getConsumerStateColumn(2L, 0), Bytes.toBytes(7));
            hTable.put(put2);
            queueAdmin.configureGroups(fromFlowlet, ImmutableMap.of(2L, 1, 4L, 1));
            Assert.assertEquals(4L, Bytes.toInt(hTable.get(new Get(bytes)).getColumnLatest(QueueEntryRow.COLUMN_FAMILY, HBaseQueueAdmin.getConsumerStateColumn(2L, 0)).getValue()));
            Result result2 = hTable.get(new Get(bytes));
            Assert.assertEquals(2L, result2.getFamilyMap(QueueEntryRow.COLUMN_FAMILY).size());
            Assert.assertEquals(4L, Bytes.toInt(result2.getColumnLatest(QueueEntryRow.COLUMN_FAMILY, HBaseQueueAdmin.getConsumerStateColumn(4L, 0)).getValue()));
            hTable.close();
            queueAdmin.dropAll();
        } catch (Throwable th) {
            hTable.close();
            queueAdmin.dropAll();
            throw th;
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void verifyConsumerConfigExists(QueueName... queueNameArr) throws InterruptedException {
        configCache.updateCache();
        for (QueueName queueName : queueNameArr) {
            Assert.assertNotNull("for " + queueName, configCache.getConsumerConfig(queueName.toBytes()));
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void verifyConsumerConfigIsDeleted(QueueName... queueNameArr) throws InterruptedException {
        configCache.updateCache();
        for (QueueName queueName : queueNameArr) {
            Assert.assertNull("for " + queueName, configCache.getConsumerConfig(queueName.toBytes()));
        }
    }

    @AfterClass
    public static void finish() throws Exception {
        txService.stop();
        testHBase.stopHBase();
        zkClientService.stopAndWait();
    }

    @Test
    public void testPrefix() {
        Assert.assertTrue(queueAdmin.getTableNamePrefix().startsWith("test."));
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void forceEviction(QueueName queueName) throws Exception {
        byte[] bytes = Bytes.toBytes(queueClientFactory.getTableName(queueName));
        final Class queueRegionObserverClassForVersion = tableUtil.getQueueRegionObserverClassForVersion();
        testHBase.forEachRegion(bytes, new Function<HRegion, Object>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueTest.4
            public Object apply(HRegion hRegion) {
                try {
                    Coprocessor findCoprocessor = hRegion.getCoprocessorHost().findCoprocessor(queueRegionObserverClassForVersion.getName());
                    HBaseQueueTest.LOG.info("forcing update cache for HBaseQueueRegionObserver of region: " + hRegion);
                    Method declaredMethod = findCoprocessor.getClass().getDeclaredMethod("getConfigCache", new Class[0]);
                    declaredMethod.setAccessible(true);
                    Object invoke = declaredMethod.invoke(findCoprocessor, new Object[0]);
                    Method declaredMethod2 = invoke.getClass().getDeclaredMethod("updateCache", new Class[0]);
                    declaredMethod2.setAccessible(true);
                    declaredMethod2.invoke(invoke, new Object[0]);
                    return null;
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        });
        testHBase.forceRegionFlush(bytes);
        testHBase.forceRegionCompact(bytes, true);
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueTest
    protected void configureGroups(QueueName queueName, Map<Long, Integer> map) throws Exception {
        if (queueName.isQueue()) {
            queueAdmin.configureGroups(queueName, map);
        } else {
            streamAdmin.configureGroups(queueName, map);
        }
    }
}
